From 5f07b7ad1fb060cd6ccd1c0500c113b0ba1fea02 Mon Sep 17 00:00:00 2001 From: Rock Chin <1010553892@qq.com> Date: Mon, 20 Mar 2023 12:06:02 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=AE=8C=E6=88=90=E6=89=80?= =?UTF-8?q?=E6=9C=89=E6=8C=87=E4=BB=A4=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/qqbot/cmds/func.py | 24 ++- pkg/qqbot/cmds/model.py | 16 +- pkg/qqbot/cmds/session.py | 178 ++++++++++++++++-- pkg/qqbot/cmds/system.py | 223 ++++++++++++++++++++++- pkg/qqbot/command.py | 370 +++----------------------------------- 5 files changed, 442 insertions(+), 369 deletions(-) diff --git a/pkg/qqbot/cmds/func.py b/pkg/qqbot/cmds/func.py index 4a53c1ba..9ee73cfd 100644 --- a/pkg/qqbot/cmds/func.py +++ b/pkg/qqbot/cmds/func.py @@ -1,5 +1,12 @@ from pkg.qqbot.cmds.model import command +import logging + +from mirai import Image + +import config +import pkg.openai.session + @command( "draw", "使用DALL·E模型作画", @@ -11,4 +18,19 @@ def cmd_draw(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """使用DALL·E模型作画""" - pass \ No newline at end of file + reply = [] + + if len(params) == 0: + reply = ["[bot]err:请输入图片描述文字"] + else: + session = pkg.openai.session.get_session(session_name) + + res = session.draw_image(" ".join(params)) + + logging.debug("draw_image result:{}".format(res)) + reply = [Image(url=res['data'][0]['url'])] + if not (hasattr(config, 'include_image_description') + and not config.include_image_description): + reply.append(" ".join(params)) + + return reply \ No newline at end of file diff --git a/pkg/qqbot/cmds/model.py b/pkg/qqbot/cmds/model.py index cc5d6ef3..3758022b 100644 --- a/pkg/qqbot/cmds/model.py +++ b/pkg/qqbot/cmds/model.py @@ -1,4 +1,5 @@ # 指令模型 +import logging commands = [] """已注册的指令类 @@ -16,7 +17,7 @@ commands = [] def command(name: str, description: str, usage: str, aliases: list = None, admin_only: bool = False): """指令装饰器""" - def wrapper(fun: function): + def wrapper(fun): commands.append({ "name": name, "description": description, @@ -28,3 +29,16 @@ def command(name: str, description: str, usage: str, aliases: list = None, admin return fun return wrapper + + +def search(cmd: str) -> dict: + """查找指令""" + for command in commands: + if (command["name"] == cmd) or (cmd in command["aliases"]): + return command + return None + + +import pkg.qqbot.cmds.func +import pkg.qqbot.cmds.system +import pkg.qqbot.cmds.session diff --git a/pkg/qqbot/cmds/session.py b/pkg/qqbot/cmds/session.py index a2a87fdb..8693dfd7 100644 --- a/pkg/qqbot/cmds/session.py +++ b/pkg/qqbot/cmds/session.py @@ -1,6 +1,11 @@ # 会话管理相关指令 -from pkg.qqbot.cmds.model import command +import datetime +import json +from pkg.qqbot.cmds.model import command +import pkg.openai.session +import pkg.utils.context +import config @command( "reset", @@ -13,7 +18,16 @@ def cmd_reset(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """重置会话""" - pass + reply = [] + + if len(params) == 0: + pkg.openai.session.get_session(session_name).reset(explicit=True) + reply = ["[bot]会话已重置"] + else: + pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0]) + reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])] + + return reply @command( @@ -27,8 +41,16 @@ def cmd_last(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """切换到前一次会话""" - pass + reply = [] + result = pkg.openai.session.get_session(session_name).last_session() + if result is None: + reply = ["[bot]没有前一次的对话"] + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)] + return reply @command( "next", @@ -41,7 +63,17 @@ def cmd_next(cmd: str, params: list, session_name: str, text_message: str, launcher_type: int, launcher_id: int, sender_id: int, is_admin: bool) -> list: """切换到后一次会话""" - pass + reply = [] + + result = pkg.openai.session.get_session(session_name).next_session() + if result is None: + reply = ["[bot]没有后一次的对话"] + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)] + + return reply @command( @@ -54,8 +86,21 @@ def cmd_next(cmd: str, params: list, session_name: str, def cmd_prompt(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: - """获取当前会话的前文""" - pass + """获取当前会话的前文""" + reply = [] + + msgs = "" + session:list = pkg.openai.session.get_session(session_name).prompt + for msg in session: + if len(params) != 0 and params[0] in ['-all', '-a']: + msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content']) + elif len(msg['content']) > 30: + msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30]) + else: + msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content']) + reply = ["[bot]当前对话所有内容:\n{}".format(msgs)] + + return reply @command( @@ -69,11 +114,58 @@ def cmd_list(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """列出当前会话的所有历史记录""" - pass + reply = [] + + pkg.openai.session.get_session(session_name).persistence() + page = 0 + + if len(params) > 0: + try: + page = int(params[0]) + except ValueError: + pass + + results = pkg.openai.session.get_session(session_name).list_history(page=page) + if len(results) == 0: + reply = ["[bot]第{}页没有历史会话".format(page)] + else: + reply_str = "[bot]历史会话 第{}页:\n".format(page) + current = -1 + for i in range(len(results)): + # 时间(使用create_timestamp转换) 序号 部分内容 + datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) + msg = "" + try: + msg = json.loads(results[i]['prompt']) + except json.decoder.JSONDecodeError: + msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt']) + # 持久化 + pkg.openai.session.get_session(session_name).persistence() + if len(msg) >= 2: + reply_str += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + msg[0]['content']) + else: + reply_str += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + "无内容") + if results[i]['create_timestamp'] == pkg.openai.session.get_session( + session_name).create_timestamp: + current = i + page * 10 + + reply_str += "\n以上信息倒序排列" + if current != -1: + reply_str += ",当前会话是 #{}\n".format(current) + else: + reply_str += ",当前处于全新会话或不在此页" + + reply = [reply_str] + + return reply @command( - "resend" + "resend", "重新获取上一次问题的回复", "!resend", [], @@ -83,7 +175,17 @@ def cmd_resend(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """重新获取上一次问题的回复""" - pass + reply = [] + + session = pkg.openai.session.get_session(session_name) + to_send = session.undo() + + mgr = pkg.utils.context.get_qqbot_manager() + + reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config, + launcher_type, launcher_id, sender_id) + + return reply @command( @@ -97,7 +199,22 @@ def cmd_del(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """删除当前会话的历史记录""" - pass + reply = [] + + if len(params) == 0: + reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"] + else: + if params[0] == 'all': + pkg.openai.session.get_session(session_name).delete_all_history() + reply = ["[bot]已删除所有历史会话"] + elif params[0].isdigit(): + if pkg.openai.session.get_session(session_name).delete_history(int(params[0])): + reply = ["[bot]已删除历史会话 #{}".format(params[0])] + else: + reply = ["[bot]没有历史会话 #{}".format(params[0])] + else: + reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"] + return reply @command( @@ -111,7 +228,30 @@ def cmd_default(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """操作情景预设""" - pass + reply = [] + + if len(params) == 0: + # 输出目前所有情景预设 + import pkg.openai.dprompt as dprompt + reply_str = "[bot]当前所有情景预设:\n\n" + for key,value in dprompt.get_prompt_dict().items(): + reply_str += " - {}: {}\n".format(key,value) + + reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current()) + reply_str += "请使用!default <情景预设>来设置默认情景预设" + reply = [reply_str] + elif len(params) >0 and is_admin: + # 设置默认情景 + import pkg.openai.dprompt as dprompt + try: + dprompt.set_current(params[0]) + reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())] + except KeyError: + reply = ["[bot]err: 未找到情景预设:{}".format(params[0])] + else: + reply = ["[bot]err: 仅管理员可设置默认情景预设"] + + return reply @command( @@ -125,4 +265,18 @@ def cmd_delhst(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """删除指定会话的所有历史记录""" - pass + reply = [] + + if len(params) == 0: + reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_, 或使用 !delhst all 删除所有会话的历史记录"] + else: + if params[0] == "all": + pkg.utils.context.get_database_manager().delete_all_session_history() + reply = ["[bot]已删除所有会话的历史记录"] + else: + if pkg.utils.context.get_database_manager().delete_all_history(params[0]): + reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])] + else: + reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])] + + return reply diff --git a/pkg/qqbot/cmds/system.py b/pkg/qqbot/cmds/system.py index d4a42749..9b6e4379 100644 --- a/pkg/qqbot/cmds/system.py +++ b/pkg/qqbot/cmds/system.py @@ -1,4 +1,14 @@ from pkg.qqbot.cmds.model import command +import pkg.utils.context +import pkg.utils.updater +import pkg.utils.credit as credit +import config + +import logging +import os +import threading +import traceback +import json @command( "help", @@ -11,7 +21,7 @@ def cmd_help(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """获取帮助信息""" - pass + return ["[bot]" + config.help_message] @command( @@ -25,7 +35,28 @@ def cmd_usage(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """获取使用情况""" - pass + reply = [] + + reply_str = "[bot]各api-key使用情况:\n\n" + + api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key + for key_name in api_keys: + text_length = pkg.utils.context.get_openai_manager().audit_mgr \ + .get_text_length_of_key(api_keys[key_name]) + image_count = pkg.utils.context.get_openai_manager().audit_mgr \ + .get_image_count_of_key(api_keys[key_name]) + reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length), + int(image_count)) + # 获取此key的额度 + try: + http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None + credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy) + reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted']) + except Exception as e: + logging.warning("获取额度失败:{}".format(e)) + + reply = [reply_str] + return reply @command( @@ -39,7 +70,94 @@ def cmd_version(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """查看版本信息""" - pass + reply = [] + + reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info()) + try: + if pkg.utils.updater.is_new_version_available(): + reply_str += "\n有新版本可用,请使用命令 !update 进行更新" + except: + pass + + reply = [reply_str] + + return reply + + +def plugin_operation(cmd, params, is_admin): + reply = [] + + import pkg.plugin.host as plugin_host + import pkg.utils.updater as updater + + plugin_list = plugin_host.__plugins__ + + if len(params) == 0: + reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__)) + idx = 0 + for key in plugin_host.iter_plugins_name(): + plugin = plugin_list[key] + reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\ + .format((idx+1), plugin['name'], + "[已禁用]" if not plugin['enabled'] else "", + plugin['description'], + plugin['version'], plugin['author']) + + if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): + remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1])) + if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT": + reply_str += "源码: "+remote_url+"\n" + + idx += 1 + + reply = [reply_str] + elif params[0] == 'update': + # 更新所有插件 + if is_admin: + def closure(): + import pkg.utils.context + updated = [] + for key in plugin_list: + plugin = plugin_list[key] + if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): + success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1])) + if success: + updated.append(plugin['name']) + + # 检查是否有requirements.txt + pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...") + for key in plugin_list: + plugin = plugin_list[key] + if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"): + logging.info("{}检测到requirements.txt,安装依赖".format(plugin['name'])) + import pkg.utils.pkgmgr + pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt") + + import main + main.reset_logging() + + pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated))) + + threading.Thread(target=closure).start() + reply = ["[bot]正在更新所有插件,请勿重复发起..."] + else: + reply = ["[bot]err:权限不足"] + elif params[0].startswith("http"): + if is_admin: + + def closure(): + try: + plugin_host.install_plugin(params[0]) + pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件") + except Exception as e: + logging.error("插件安装失败:{}".format(e)) + pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e)) + + threading.Thread(target=closure, args=()).start() + reply = ["[bot]正在安装插件..."] + else: + reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] + return reply @command( @@ -53,7 +171,8 @@ def cmd_plugin(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """插件相关操作""" - pass + reply = plugin_operation(cmd, params, is_admin) + return reply @command( @@ -67,7 +186,11 @@ def cmd_reload(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """执行热重载""" - pass + import pkg.utils.reloader + def reload_task(): + pkg.utils.reloader.reload_all() + + threading.Thread(target=reload_task, daemon=True).start() @command( @@ -81,7 +204,92 @@ def cmd_update(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """更新程序""" - pass + reply = [] + import pkg.utils.updater + import pkg.utils.reloader + import pkg.utils.context + + def update_task(): + try: + if pkg.utils.updater.update_all(): + pkg.utils.reloader.reload_all(notify=False) + pkg.utils.context.get_qqbot_manager().notify_admin("更新完成") + else: + pkg.utils.context.get_qqbot_manager().notify_admin("无新版本") + except Exception as e0: + traceback.print_exc() + pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0)) + return + + threading.Thread(target=update_task, daemon=True).start() + + reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."] + + +def config_operation(cmd, params): + reply = [] + config = pkg.utils.context.get_config() + reply_str = "" + if len(params) == 0: + reply = ["[bot]err:请输入配置项"] + else: + cfg_name = params[0] + if cfg_name == 'all': + reply_str = "[bot]所有配置项:\n\n" + for cfg in dir(config): + if not cfg.startswith('__') and not cfg == 'logging': + # 根据配置项类型进行格式化,如果是字典则转换为json并格式化 + if isinstance(getattr(config, cfg), str): + reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg)) + elif isinstance(getattr(config, cfg), dict): + # 不进行unicode转义,并格式化 + reply_str += "{}: {}\n".format(cfg, + json.dumps(getattr(config, cfg), + ensure_ascii=False, indent=4)) + else: + reply_str += "{}: {}\n".format(cfg, getattr(config, cfg)) + reply = [reply_str] + elif cfg_name in dir(config): + if len(params) == 1: + # 按照配置项类型进行格式化 + if isinstance(getattr(config, cfg_name), str): + reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name)) + elif isinstance(getattr(config, cfg_name), dict): + reply_str = "[bot]配置项{}: {}\n".format(cfg_name, + json.dumps(getattr(config, cfg_name), + ensure_ascii=False, indent=4)) + else: + reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name)) + reply = [reply_str] + else: + cfg_value = " ".join(params[1:]) + # 类型转换,如果是json则转换为字典 + if cfg_value == 'true': + cfg_value = True + elif cfg_value == 'false': + cfg_value = False + elif cfg_value.isdigit(): + cfg_value = int(cfg_value) + elif cfg_value.startswith('{') and cfg_value.endswith('}'): + cfg_value = json.loads(cfg_value) + else: + try: + cfg_value = float(cfg_value) + except ValueError: + pass + + # 检查类型是否匹配 + if isinstance(getattr(config, cfg_name), type(cfg_value)): + setattr(config, cfg_name, cfg_value) + pkg.utils.context.set_config(config) + reply = ["[bot]配置项{}修改成功".format(cfg_name)] + else: + reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)] + + else: + reply = ["[bot]err:未找到配置项 {}".format(cfg_name)] + + return reply @command( @@ -95,4 +303,5 @@ def cmd_cfg(cmd: str, params: list, session_name: str, text_message: str, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: """配置文件相关操作""" - pass + reply = config_operation(cmd, params) + return reply diff --git a/pkg/qqbot/command.py b/pkg/qqbot/command.py index b6d7651f..dddc43e1 100644 --- a/pkg/qqbot/command.py +++ b/pkg/qqbot/command.py @@ -13,151 +13,11 @@ import pkg.utils.updater import pkg.utils.context import pkg.qqbot.message import pkg.utils.credit as credit +import pkg.qqbot.cmds.model as cmdmodel from mirai import Image -def config_operation(cmd, params): - reply = [] - config = pkg.utils.context.get_config() - reply_str = "" - if len(params) == 0: - reply = ["[bot]err:请输入配置项"] - else: - cfg_name = params[0] - if cfg_name == 'all': - reply_str = "[bot]所有配置项:\n\n" - for cfg in dir(config): - if not cfg.startswith('__') and not cfg == 'logging': - # 根据配置项类型进行格式化,如果是字典则转换为json并格式化 - if isinstance(getattr(config, cfg), str): - reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg)) - elif isinstance(getattr(config, cfg), dict): - # 不进行unicode转义,并格式化 - reply_str += "{}: {}\n".format(cfg, - json.dumps(getattr(config, cfg), - ensure_ascii=False, indent=4)) - else: - reply_str += "{}: {}\n".format(cfg, getattr(config, cfg)) - reply = [reply_str] - elif cfg_name in dir(config): - if len(params) == 1: - # 按照配置项类型进行格式化 - if isinstance(getattr(config, cfg_name), str): - reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name)) - elif isinstance(getattr(config, cfg_name), dict): - reply_str = "[bot]配置项{}: {}\n".format(cfg_name, - json.dumps(getattr(config, cfg_name), - ensure_ascii=False, indent=4)) - else: - reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name)) - reply = [reply_str] - else: - cfg_value = " ".join(params[1:]) - # 类型转换,如果是json则转换为字典 - if cfg_value == 'true': - cfg_value = True - elif cfg_value == 'false': - cfg_value = False - elif cfg_value.isdigit(): - cfg_value = int(cfg_value) - elif cfg_value.startswith('{') and cfg_value.endswith('}'): - cfg_value = json.loads(cfg_value) - else: - try: - cfg_value = float(cfg_value) - except ValueError: - pass - - # 检查类型是否匹配 - if isinstance(getattr(config, cfg_name), type(cfg_value)): - setattr(config, cfg_name, cfg_value) - pkg.utils.context.set_config(config) - reply = ["[bot]配置项{}修改成功".format(cfg_name)] - else: - reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)] - - else: - reply = ["[bot]err:未找到配置项 {}".format(cfg_name)] - - return reply - - -def plugin_operation(cmd, params, is_admin): - reply = [] - - import pkg.plugin.host as plugin_host - import pkg.utils.updater as updater - - plugin_list = plugin_host.__plugins__ - - if len(params) == 0: - reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__)) - idx = 0 - for key in plugin_host.iter_plugins_name(): - plugin = plugin_list[key] - reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\ - .format((idx+1), plugin['name'], - "[已禁用]" if not plugin['enabled'] else "", - plugin['description'], - plugin['version'], plugin['author']) - - if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): - remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1])) - if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT": - reply_str += "源码: "+remote_url+"\n" - - idx += 1 - - reply = [reply_str] - elif params[0] == 'update': - # 更新所有插件 - if is_admin: - def closure(): - import pkg.utils.context - updated = [] - for key in plugin_list: - plugin = plugin_list[key] - if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): - success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1])) - if success: - updated.append(plugin['name']) - - # 检查是否有requirements.txt - pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...") - for key in plugin_list: - plugin = plugin_list[key] - if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"): - logging.info("{}检测到requirements.txt,安装依赖".format(plugin['name'])) - import pkg.utils.pkgmgr - pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt") - - import main - main.reset_logging() - - pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated))) - - threading.Thread(target=closure).start() - reply = ["[bot]正在更新所有插件,请勿重复发起..."] - else: - reply = ["[bot]err:权限不足"] - elif params[0].startswith("http"): - if is_admin: - - def closure(): - try: - plugin_host.install_plugin(params[0]) - pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件") - except Exception as e: - logging.error("插件安装失败:{}".format(e)) - pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e)) - - threading.Thread(target=closure, args=()).start() - reply = ["[bot]正在安装插件..."] - else: - reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] - return reply - def process_command(session_name: str, text_message: str, mgr, config, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: @@ -170,216 +30,30 @@ def process_command(session_name: str, text_message: str, mgr, config, cmd = text_message[1:].strip().split(' ')[0] params = text_message[1:].strip().split(' ')[1:] - if cmd == 'help': - reply = ["[bot]" + config.help_message] - elif cmd == 'reset': - if len(params) == 0: - pkg.openai.session.get_session(session_name).reset(explicit=True) - reply = ["[bot]会话已重置"] - else: - pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0]) - reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])] - elif cmd == 'last': - result = pkg.openai.session.get_session(session_name).last_session() - if result is None: - reply = ["[bot]没有前一次的对话"] - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)] - elif cmd == 'next': - result = pkg.openai.session.get_session(session_name).next_session() - if result is None: - reply = ["[bot]没有后一次的对话"] - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)] - elif cmd == 'prompt': - msgs = "" - session:list = pkg.openai.session.get_session(session_name).prompt - for msg in session: - if len(params) != 0 and params[0] in ['-all', '-a']: - msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content']) - elif len(msg['content']) > 30: - msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30]) - else: - msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content']) - reply = ["[bot]当前对话所有内容:\n{}".format(msgs)] - elif cmd == 'list': - pkg.openai.session.get_session(session_name).persistence() - page = 0 - if len(params) > 0: - try: - page = int(params[0]) - except ValueError: - pass + # 把!~开头的转换成!cfg + if cmd.startswith('~'): + params = [cmd[1:]] + params + cmd = 'cfg' - results = pkg.openai.session.get_session(session_name).list_history(page=page) - if len(results) == 0: - reply = ["[bot]第{}页没有历史会话".format(page)] - else: - reply_str = "[bot]历史会话 第{}页:\n".format(page) - current = -1 - for i in range(len(results)): - # 时间(使用create_timestamp转换) 序号 部分内容 - datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) - msg = "" - try: - msg = json.loads(results[i]['prompt']) - except json.decoder.JSONDecodeError: - msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt']) - # 持久化 - pkg.openai.session.get_session(session_name).persistence() - if len(msg) >= 2: - reply_str += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - msg[0]['content']) - else: - reply_str += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - "无内容") - if results[i]['create_timestamp'] == pkg.openai.session.get_session( - session_name).create_timestamp: - current = i + page * 10 - - reply_str += "\n以上信息倒序排列" - if current != -1: - reply_str += ",当前会话是 #{}\n".format(current) - else: - reply_str += ",当前处于全新会话或不在此页" - - reply = [reply_str] - elif cmd == 'resend': - session = pkg.openai.session.get_session(session_name) - to_send = session.undo() - - reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config, - launcher_type, launcher_id, sender_id) - elif cmd == 'del': # 删除指定会话历史记录 - if len(params) == 0: - reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"] - else: - if params[0] == 'all': - pkg.openai.session.get_session(session_name).delete_all_history() - reply = ["[bot]已删除所有历史会话"] - elif params[0].isdigit(): - if pkg.openai.session.get_session(session_name).delete_history(int(params[0])): - reply = ["[bot]已删除历史会话 #{}".format(params[0])] - else: - reply = ["[bot]没有历史会话 #{}".format(params[0])] - else: - reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"] - elif cmd == 'usage': - reply_str = "[bot]各api-key使用情况:\n\n" - - api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key - for key_name in api_keys: - text_length = pkg.utils.context.get_openai_manager().audit_mgr \ - .get_text_length_of_key(api_keys[key_name]) - image_count = pkg.utils.context.get_openai_manager().audit_mgr \ - .get_image_count_of_key(api_keys[key_name]) - reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length), - int(image_count)) - # 获取此key的额度 - try: - http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None - credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy) - reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted']) - except Exception as e: - logging.warning("获取额度失败:{}".format(e)) - - reply = [reply_str] - elif cmd == 'draw': - if len(params) == 0: - reply = ["[bot]err:请输入图片描述文字"] - else: - session = pkg.openai.session.get_session(session_name) - - res = session.draw_image(" ".join(params)) - - logging.debug("draw_image result:{}".format(res)) - reply = [Image(url=res['data'][0]['url'])] - if not (hasattr(config, 'include_image_description') - and not config.include_image_description): - reply.append(" ".join(params)) - elif cmd == 'version': - reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info()) - try: - if pkg.utils.updater.is_new_version_available(): - reply_str += "\n有新版本可用,请使用命令 !update 进行更新" - except: - pass - - reply = [reply_str] - - elif cmd == 'plugin': - reply = plugin_operation(cmd, params, is_admin) - - elif cmd == 'default': - if len(params) == 0: - # 输出目前所有情景预设 - import pkg.openai.dprompt as dprompt - reply_str = "[bot]当前所有情景预设:\n\n" - for key,value in dprompt.get_prompt_dict().items(): - reply_str += " - {}: {}\n".format(key,value) - - reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current()) - reply_str += "请使用!default <情景预设>来设置默认情景预设" - reply = [reply_str] - elif len(params) >0 and is_admin: - # 设置默认情景 - import pkg.openai.dprompt as dprompt - try: - dprompt.set_current(params[0]) - reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())] - except KeyError: - reply = ["[bot]err: 未找到情景预设:{}".format(params[0])] - else: - reply = ["[bot]err: 仅管理员可设置默认情景预设"] - elif cmd == "delhst" and is_admin: - if len(params) == 0: - reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_, 或使用 !delhst all 删除所有会话的历史记录"] - else: - if params[0] == "all": - pkg.utils.context.get_database_manager().delete_all_session_history() - reply = ["[bot]已删除所有会话的历史记录"] - else: - if pkg.utils.context.get_database_manager().delete_all_history(params[0]): - reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])] - else: - reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])] - elif cmd == 'reload' and is_admin: - def reload_task(): - pkg.utils.reloader.reload_all() - - threading.Thread(target=reload_task, daemon=True).start() - elif cmd == 'update' and is_admin: - def update_task(): - try: - if pkg.utils.updater.update_all(): - pkg.utils.reloader.reload_all(notify=False) - pkg.utils.context.get_qqbot_manager().notify_admin("更新完成") - else: - pkg.utils.context.get_qqbot_manager().notify_admin("无新版本") - except Exception as e0: - traceback.print_exc() - pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0)) - return - - threading.Thread(target=update_task, daemon=True).start() - - reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."] - elif cmd == 'cfg' and is_admin: - reply = config_operation(cmd, params) + # 选择指令处理函数 + cmd_obj = cmdmodel.search(cmd) + if cmd_obj is not None and (cmd_obj['admin_only'] is False or is_admin): + cmd_func = cmd_obj['func'] + reply = cmd_func( + cmd=cmd, + params=params, + session_name=session_name, + text_message=text_message, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=sender_id, + is_admin=is_admin, + ) else: - if cmd.startswith("~") and is_admin: - config_item = cmd[1:] - params = [config_item] + params - reply = config_operation("cfg", params) - else: - reply = ["[bot]err:未知的指令或权限不足: " + cmd] + reply = ["[bot]err:未知的指令或权限不足: " + cmd] + + return reply except Exception as e: mgr.notify_admin("{}指令执行失败:{}".format(session_name, e)) logging.exception(e)