mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
feat: remove unusable commands
This commit is contained in:
@@ -1,78 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from .. import operator, entities, errors
|
||||
|
||||
|
||||
@operator.operator_class(
|
||||
name='model',
|
||||
help='显示和切换模型列表',
|
||||
usage='!model\n!model show <模型名>\n!model set <模型名>',
|
||||
privilege=2,
|
||||
)
|
||||
class ModelOperator(operator.CommandOperator):
|
||||
"""Model命令"""
|
||||
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
content = '模型列表:\n'
|
||||
|
||||
model_list = self.ap.model_mgr.model_list
|
||||
|
||||
for model in model_list:
|
||||
content += f'\n名称: {model.name}\n'
|
||||
content += f'请求器: {model.requester.name}\n'
|
||||
|
||||
content += f'\n当前对话使用模型: {context.query.use_model.name}\n'
|
||||
content += f'新对话默认使用模型: {self.ap.provider_cfg.data.get("model")}\n'
|
||||
|
||||
yield entities.CommandReturn(text=content.strip())
|
||||
|
||||
|
||||
@operator.operator_class(name='show', help='显示模型详情', privilege=2, parent_class=ModelOperator)
|
||||
class ModelShowOperator(operator.CommandOperator):
|
||||
"""Model Show命令"""
|
||||
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
model_name = context.crt_params[0]
|
||||
|
||||
model = None
|
||||
for _model in self.ap.model_mgr.model_list:
|
||||
if model_name == _model.name:
|
||||
model = _model
|
||||
break
|
||||
|
||||
if model is None:
|
||||
yield entities.CommandReturn(error=errors.CommandError(f'未找到模型 {model_name}'))
|
||||
else:
|
||||
content = '模型详情\n'
|
||||
content += f'名称: {model.name}\n'
|
||||
if model.model_name is not None:
|
||||
content += f'请求模型名称: {model.model_name}\n'
|
||||
content += f'请求器: {model.requester.name}\n'
|
||||
content += f'密钥组: {model.token_mgr.name}\n'
|
||||
content += f'支持视觉: {model.vision_supported}\n'
|
||||
content += f'支持工具: {model.tool_call_supported}\n'
|
||||
|
||||
yield entities.CommandReturn(text=content.strip())
|
||||
|
||||
|
||||
@operator.operator_class(name='set', help='设置默认使用模型', privilege=2, parent_class=ModelOperator)
|
||||
class ModelSetOperator(operator.CommandOperator):
|
||||
"""Model Set命令"""
|
||||
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
model_name = context.crt_params[0]
|
||||
|
||||
model = None
|
||||
for _model in self.ap.model_mgr.model_list:
|
||||
if model_name == _model.name:
|
||||
model = _model
|
||||
break
|
||||
|
||||
if model is None:
|
||||
yield entities.CommandReturn(error=errors.CommandError(f'未找到模型 {model_name}'))
|
||||
else:
|
||||
self.ap.provider_cfg.data['model'] = model_name
|
||||
await self.ap.provider_cfg.dump_config()
|
||||
yield entities.CommandReturn(text=f'已设置当前使用模型为 {model_name},重置会话以生效')
|
||||
@@ -1,102 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import typing
|
||||
|
||||
import ollama
|
||||
from .. import operator, entities, errors
|
||||
|
||||
|
||||
@operator.operator_class(
|
||||
name='ollama',
|
||||
help='ollama平台操作',
|
||||
usage='!ollama\n!ollama show <模型名>\n!ollama pull <模型名>\n!ollama del <模型名>',
|
||||
)
|
||||
class OllamaOperator(operator.CommandOperator):
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
try:
|
||||
content: str = '模型列表:\n'
|
||||
model_list: list = ollama.list().get('models', [])
|
||||
for model in model_list:
|
||||
content += f'名称: {model["name"]}\n'
|
||||
content += f'修改时间: {model["modified_at"]}\n'
|
||||
content += f'大小: {bytes_to_mb(model["size"])}MB\n\n'
|
||||
yield entities.CommandReturn(text=f'{content.strip()}')
|
||||
except ollama.ResponseError:
|
||||
yield entities.CommandReturn(error=errors.CommandError('无法获取模型列表,请确认 Ollama 服务正常'))
|
||||
|
||||
|
||||
def bytes_to_mb(num_bytes):
|
||||
mb: float = num_bytes / 1024 / 1024
|
||||
return format(mb, '.2f')
|
||||
|
||||
|
||||
@operator.operator_class(name='show', help='ollama模型详情', privilege=2, parent_class=OllamaOperator)
|
||||
class OllamaShowOperator(operator.CommandOperator):
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
content: str = '模型详情:\n'
|
||||
try:
|
||||
show: dict = ollama.show(model=context.crt_params[0])
|
||||
model_info: dict = show.get('model_info', {})
|
||||
ignore_show: str = 'too long to show...'
|
||||
|
||||
for key in ['license', 'modelfile']:
|
||||
show[key] = ignore_show
|
||||
|
||||
for key in [
|
||||
'tokenizer.chat_template.rag',
|
||||
'tokenizer.chat_template.tool_use',
|
||||
]:
|
||||
model_info[key] = ignore_show
|
||||
|
||||
content += json.dumps(show, indent=4)
|
||||
yield entities.CommandReturn(text=content.strip())
|
||||
except ollama.ResponseError:
|
||||
yield entities.CommandReturn(error=errors.CommandError('无法获取模型详情,请确认 Ollama 服务正常'))
|
||||
|
||||
|
||||
@operator.operator_class(name='pull', help='ollama模型拉取', privilege=2, parent_class=OllamaOperator)
|
||||
class OllamaPullOperator(operator.CommandOperator):
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
try:
|
||||
model_list: list = ollama.list().get('models', [])
|
||||
if context.crt_params[0] in [model['name'] for model in model_list]:
|
||||
yield entities.CommandReturn(text='模型已存在')
|
||||
return
|
||||
except ollama.ResponseError:
|
||||
yield entities.CommandReturn(error=errors.CommandError('无法获取模型列表,请确认 Ollama 服务正常'))
|
||||
return
|
||||
|
||||
on_progress: bool = False
|
||||
progress_count: int = 0
|
||||
try:
|
||||
for resp in ollama.pull(model=context.crt_params[0], stream=True):
|
||||
total: typing.Any = resp.get('total')
|
||||
if not on_progress:
|
||||
if total is not None:
|
||||
on_progress = True
|
||||
yield entities.CommandReturn(text=resp.get('status'))
|
||||
else:
|
||||
if total is None:
|
||||
on_progress = False
|
||||
|
||||
completed: typing.Any = resp.get('completed')
|
||||
if isinstance(completed, int) and isinstance(total, int):
|
||||
percentage_completed = (completed / total) * 100
|
||||
if percentage_completed > progress_count:
|
||||
progress_count += 10
|
||||
yield entities.CommandReturn(
|
||||
text=f'下载进度: {completed}/{total} ({percentage_completed:.2f}%)'
|
||||
)
|
||||
except ollama.ResponseError as e:
|
||||
yield entities.CommandReturn(text=f'拉取失败: {e.error}')
|
||||
|
||||
|
||||
@operator.operator_class(name='del', help='ollama模型删除', privilege=2, parent_class=OllamaOperator)
|
||||
class OllamaDelOperator(operator.CommandOperator):
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
try:
|
||||
ret: str = ollama.delete(model=context.crt_params[0])['status']
|
||||
except ollama.ResponseError as e:
|
||||
ret = f'{e.error}'
|
||||
yield entities.CommandReturn(text=ret)
|
||||
@@ -1,20 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import traceback
|
||||
|
||||
from .. import operator, entities, errors
|
||||
from .. import operator, entities
|
||||
|
||||
|
||||
@operator.operator_class(name='update', help='更新程序', usage='!update', privilege=2)
|
||||
class UpdateCommand(operator.CommandOperator):
|
||||
async def execute(self, context: entities.ExecuteContext) -> typing.AsyncGenerator[entities.CommandReturn, None]:
|
||||
try:
|
||||
yield entities.CommandReturn(text='正在进行更新...')
|
||||
if await self.ap.ver_mgr.update_all():
|
||||
yield entities.CommandReturn(text='更新完成,请重启程序以应用更新')
|
||||
else:
|
||||
yield entities.CommandReturn(text='当前已是最新版本')
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
yield entities.CommandReturn(error=errors.CommandError('更新失败: ' + str(e)))
|
||||
yield entities.CommandReturn(text='不再支持通过命令更新,请查看 LangBot 文档。')
|
||||
|
||||
Reference in New Issue
Block a user