diff --git a/QChatGPT.wiki b/QChatGPT.wiki index 0379ac2e..68c4ef5d 160000 --- a/QChatGPT.wiki +++ b/QChatGPT.wiki @@ -1 +1 @@ -Subproject commit 0379ac2e14395bd66a0e3c4e45413cf0261d4725 +Subproject commit 68c4ef5d240877a871044e0b340db183453799bf diff --git a/README.md b/README.md index 51951f28..50fd2ca1 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,9 @@ python3 main.py 详见[Wiki插件使用页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E4%BD%BF%E7%94%A8) 开发教程见[Wiki插件开发页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E5%BC%80%E5%8F%91) +
+查看插件列表 + ### 示例插件 在`tests/plugin_examples`目录下,将其整个目录复制到`plugins`目录下即可使用 @@ -237,6 +240,7 @@ python3 main.py - [oliverkirk-sudo/chat_voice](https://github.com/oliverkirk-sudo/chat_voice) - 文字转语音输出,使用HuggingFace上的[VITS-Umamusume-voice-synthesizer模型](https://huggingface.co/spaces/Plachta/VITS-Umamusume-voice-synthesizer) - [RockChinQ/WaitYiYan](https://github.com/RockChinQ/WaitYiYan) - 实时获取百度`文心一言`等待列表人数 - [QChartGPT_Emoticon_Plugin](https://github.com/chordfish-k/QChartGPT_Emoticon_Plugin) - 使机器人根据回复内容发送表情包 +
## 😘致谢 @@ -248,6 +252,6 @@ python3 main.py 以及所有[贡献者](https://github.com/RockChinQ/QChatGPT/graphs/contributors)和其他为本项目提供支持的朋友们。 -## 👍赞赏 + diff --git a/pkg/plugin/host.py b/pkg/plugin/host.py index a8163f16..ae3aee63 100644 --- a/pkg/plugin/host.py +++ b/pkg/plugin/host.py @@ -5,6 +5,7 @@ import importlib import os import pkgutil import sys +import shutil import traceback import pkg.utils.context as context @@ -160,6 +161,22 @@ def install_plugin(repo_url: str): main.reset_logging() +def uninstall_plugin(plugin_name: str) -> str: + """ 卸载插件 """ + if plugin_name not in __plugins__: + raise Exception("插件不存在") + + # 获取文件夹路径 + plugin_path = __plugins__[plugin_name]['path'].replace("\\", "/") + + # 剪切路径为plugins/插件名 + plugin_path = plugin_path.split("plugins/")[1].split("/")[0] + + # 删除文件夹 + shutil.rmtree("plugins/"+plugin_path) + return "plugins/"+plugin_path + + class EventContext: """ 事件上下文 """ eid = 0 diff --git a/pkg/qqbot/cmds/__init__.py b/pkg/qqbot/cmds/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/qqbot/cmds/func.py b/pkg/qqbot/cmds/func.py new file mode 100644 index 00000000..9ee73cfd --- /dev/null +++ b/pkg/qqbot/cmds/func.py @@ -0,0 +1,36 @@ +from pkg.qqbot.cmds.model import command + +import logging + +from mirai import Image + +import config +import pkg.openai.session + +@command( + "draw", + "使用DALL·E模型作画", + "!draw <图片提示语>", + [], + False +) +def cmd_draw(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """使用DALL·E模型作画""" + reply = [] + + if len(params) == 0: + reply = ["[bot]err:请输入图片描述文字"] + else: + session = pkg.openai.session.get_session(session_name) + + res = session.draw_image(" ".join(params)) + + logging.debug("draw_image result:{}".format(res)) + reply = [Image(url=res['data'][0]['url'])] + if not (hasattr(config, 'include_image_description') + and not config.include_image_description): + reply.append(" ".join(params)) + + return reply \ No newline at end of file diff --git a/pkg/qqbot/cmds/model.py b/pkg/qqbot/cmds/model.py new file mode 100644 index 00000000..b8bb2743 --- /dev/null +++ b/pkg/qqbot/cmds/model.py @@ -0,0 +1,45 @@ +# 指令模型 +import logging + +commands = [] +"""已注册的指令类 +{ + "name": "指令名", + "description": "指令描述", + "usage": "指令用法", + "aliases": ["别名1", "别名2"], + "admin_only": "是否仅管理员可用", + "func": "指令执行函数" +} +""" + + +def command(name: str, description: str, usage: str, aliases: list = None, admin_only: bool = False): + """指令装饰器""" + + def wrapper(fun): + commands.append({ + "name": name, + "description": description, + "usage": usage, + "aliases": aliases, + "admin_only": admin_only, + "func": fun + }) + return fun + + return wrapper + + +def search(cmd: str) -> dict: + """查找指令""" + for command in commands: + if (command["name"] == cmd) or (cmd in command["aliases"]): + return command + return None + + +import pkg.qqbot.cmds.func +import pkg.qqbot.cmds.system +import pkg.qqbot.cmds.session +import pkg.qqbot.cmds.plugin diff --git a/pkg/qqbot/cmds/plugin.py b/pkg/qqbot/cmds/plugin.py new file mode 100644 index 00000000..0e400400 --- /dev/null +++ b/pkg/qqbot/cmds/plugin.py @@ -0,0 +1,129 @@ +from pkg.qqbot.cmds.model import command +import pkg.utils.context +import pkg.plugin.switch as plugin_switch + +import os +import threading +import logging + + +def plugin_operation(cmd, params, is_admin): + reply = [] + + import pkg.plugin.host as plugin_host + import pkg.utils.updater as updater + + plugin_list = plugin_host.__plugins__ + + if len(params) == 0: + reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__)) + idx = 0 + for key in plugin_host.iter_plugins_name(): + plugin = plugin_list[key] + reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\ + .format((idx+1), plugin['name'], + "[已禁用]" if not plugin['enabled'] else "", + plugin['description'], + plugin['version'], plugin['author']) + + if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): + remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1])) + if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT": + reply_str += "源码: "+remote_url+"\n" + + idx += 1 + + reply = [reply_str] + elif params[0] == 'update': + # 更新所有插件 + if is_admin: + def closure(): + import pkg.utils.context + updated = [] + for key in plugin_list: + plugin = plugin_list[key] + if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): + success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1])) + if success: + updated.append(plugin['name']) + + # 检查是否有requirements.txt + pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...") + for key in plugin_list: + plugin = plugin_list[key] + if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"): + logging.info("{}检测到requirements.txt,安装依赖".format(plugin['name'])) + import pkg.utils.pkgmgr + pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt") + + import main + main.reset_logging() + + pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated))) + + threading.Thread(target=closure).start() + reply = ["[bot]正在更新所有插件,请勿重复发起..."] + else: + reply = ["[bot]err:权限不足"] + elif params[0] == 'del' or params[0] == 'delete': + if is_admin: + if len(params) < 2: + reply = ["[bot]err:未指定插件名"] + else: + plugin_name = params[1] + if plugin_name in plugin_list: + unin_path = plugin_host.uninstall_plugin(plugin_name) + reply = ["[bot]已删除插件: {} ({}), 请发送 !reload 重载插件".format(plugin_name, unin_path)] + else: + reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)] + else: + reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] + elif params[0] == 'on' or params[0] == 'off' : + new_status = params[0] == 'on' + if is_admin: + if len(params) < 2: + reply = ["[bot]err:未指定插件名"] + else: + plugin_name = params[1] + if plugin_name in plugin_list: + plugin_list[plugin_name]['enabled'] = new_status + plugin_switch.dump_switch() + reply = ["[bot]已{}插件: {}".format("启用" if new_status else "禁用", plugin_name)] + else: + reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)] + else: + reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] + elif params[0].startswith("http"): + if is_admin: + + def closure(): + try: + plugin_host.install_plugin(params[0]) + pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件") + except Exception as e: + logging.error("插件安装失败:{}".format(e)) + pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e)) + + threading.Thread(target=closure, args=()).start() + reply = ["[bot]正在安装插件..."] + else: + reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] + else: + reply = ["[bot]err:未知参数: {}".format(params)] + + return reply + + +@command( + "plugin", + "插件相关操作", + "!plugin\n!plugin <插件仓库地址>\!plugin update\n!plugin del <插件名>\n!plugin on <插件名>\n!plugin off <插件名>", + [], + False +) +def cmd_plugin(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """插件相关操作""" + reply = plugin_operation(cmd, params, is_admin) + return reply \ No newline at end of file diff --git a/pkg/qqbot/cmds/session.py b/pkg/qqbot/cmds/session.py new file mode 100644 index 00000000..8693dfd7 --- /dev/null +++ b/pkg/qqbot/cmds/session.py @@ -0,0 +1,282 @@ +# 会话管理相关指令 +import datetime +import json + +from pkg.qqbot.cmds.model import command +import pkg.openai.session +import pkg.utils.context +import config + +@command( + "reset", + "重置当前会话", + "!reset\n!reset [使用情景预设名称]", + [], + False +) +def cmd_reset(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """重置会话""" + reply = [] + + if len(params) == 0: + pkg.openai.session.get_session(session_name).reset(explicit=True) + reply = ["[bot]会话已重置"] + else: + pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0]) + reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])] + + return reply + + +@command( + "last", + "切换到前一次会话", + "!last", + [], + False +) +def cmd_last(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """切换到前一次会话""" + reply = [] + result = pkg.openai.session.get_session(session_name).last_session() + if result is None: + reply = ["[bot]没有前一次的对话"] + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)] + + return reply + +@command( + "next", + "切换到后一次会话", + "!next", + [], + False +) +def cmd_next(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: int, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """切换到后一次会话""" + reply = [] + + result = pkg.openai.session.get_session(session_name).next_session() + if result is None: + reply = ["[bot]没有后一次的对话"] + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)] + + return reply + + +@command( + "prompt", + "获取当前会话的前文", + "!prompt", + [], + False +) +def cmd_prompt(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """获取当前会话的前文""" + reply = [] + + msgs = "" + session:list = pkg.openai.session.get_session(session_name).prompt + for msg in session: + if len(params) != 0 and params[0] in ['-all', '-a']: + msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content']) + elif len(msg['content']) > 30: + msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30]) + else: + msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content']) + reply = ["[bot]当前对话所有内容:\n{}".format(msgs)] + + return reply + + +@command( + "list", + "列出当前会话的所有历史记录", + "!list\n!list [页数]", + [], + False +) +def cmd_list(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """列出当前会话的所有历史记录""" + reply = [] + + pkg.openai.session.get_session(session_name).persistence() + page = 0 + + if len(params) > 0: + try: + page = int(params[0]) + except ValueError: + pass + + results = pkg.openai.session.get_session(session_name).list_history(page=page) + if len(results) == 0: + reply = ["[bot]第{}页没有历史会话".format(page)] + else: + reply_str = "[bot]历史会话 第{}页:\n".format(page) + current = -1 + for i in range(len(results)): + # 时间(使用create_timestamp转换) 序号 部分内容 + datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) + msg = "" + try: + msg = json.loads(results[i]['prompt']) + except json.decoder.JSONDecodeError: + msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt']) + # 持久化 + pkg.openai.session.get_session(session_name).persistence() + if len(msg) >= 2: + reply_str += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + msg[0]['content']) + else: + reply_str += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + "无内容") + if results[i]['create_timestamp'] == pkg.openai.session.get_session( + session_name).create_timestamp: + current = i + page * 10 + + reply_str += "\n以上信息倒序排列" + if current != -1: + reply_str += ",当前会话是 #{}\n".format(current) + else: + reply_str += ",当前处于全新会话或不在此页" + + reply = [reply_str] + + return reply + + +@command( + "resend", + "重新获取上一次问题的回复", + "!resend", + [], + False +) +def cmd_resend(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """重新获取上一次问题的回复""" + reply = [] + + session = pkg.openai.session.get_session(session_name) + to_send = session.undo() + + mgr = pkg.utils.context.get_qqbot_manager() + + reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config, + launcher_type, launcher_id, sender_id) + + return reply + + +@command( + "del", + "删除当前会话的历史记录", + "!del <序号>\n!del all", + [], + False +) +def cmd_del(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """删除当前会话的历史记录""" + reply = [] + + if len(params) == 0: + reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"] + else: + if params[0] == 'all': + pkg.openai.session.get_session(session_name).delete_all_history() + reply = ["[bot]已删除所有历史会话"] + elif params[0].isdigit(): + if pkg.openai.session.get_session(session_name).delete_history(int(params[0])): + reply = ["[bot]已删除历史会话 #{}".format(params[0])] + else: + reply = ["[bot]没有历史会话 #{}".format(params[0])] + else: + reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"] + return reply + + +@command( + "default", + "操作情景预设", + "!default\n!default [指定情景预设为默认]", + [], + False +) +def cmd_default(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """操作情景预设""" + reply = [] + + if len(params) == 0: + # 输出目前所有情景预设 + import pkg.openai.dprompt as dprompt + reply_str = "[bot]当前所有情景预设:\n\n" + for key,value in dprompt.get_prompt_dict().items(): + reply_str += " - {}: {}\n".format(key,value) + + reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current()) + reply_str += "请使用!default <情景预设>来设置默认情景预设" + reply = [reply_str] + elif len(params) >0 and is_admin: + # 设置默认情景 + import pkg.openai.dprompt as dprompt + try: + dprompt.set_current(params[0]) + reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())] + except KeyError: + reply = ["[bot]err: 未找到情景预设:{}".format(params[0])] + else: + reply = ["[bot]err: 仅管理员可设置默认情景预设"] + + return reply + + +@command( + "delhst", + "删除指定会话的所有历史记录", + "!delhst <会话名称>\n!delhst all", + [], + True +) +def cmd_delhst(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """删除指定会话的所有历史记录""" + reply = [] + + if len(params) == 0: + reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_, 或使用 !delhst all 删除所有会话的历史记录"] + else: + if params[0] == "all": + pkg.utils.context.get_database_manager().delete_all_session_history() + reply = ["[bot]已删除所有会话的历史记录"] + else: + if pkg.utils.context.get_database_manager().delete_all_history(params[0]): + reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])] + else: + reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])] + + return reply diff --git a/pkg/qqbot/cmds/system.py b/pkg/qqbot/cmds/system.py new file mode 100644 index 00000000..1d6c6717 --- /dev/null +++ b/pkg/qqbot/cmds/system.py @@ -0,0 +1,216 @@ +from pkg.qqbot.cmds.model import command +import pkg.utils.context +import pkg.utils.updater +import pkg.utils.credit as credit +import config + +import logging +import os +import threading +import traceback +import json + +@command( + "help", + "获取帮助信息", + "!help", + [], + False +) +def cmd_help(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """获取帮助信息""" + return ["[bot]" + config.help_message] + + +@command( + "usage", + "获取使用情况", + "!usage", + [], + False +) +def cmd_usage(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """获取使用情况""" + reply = [] + + reply_str = "[bot]各api-key使用情况:\n\n" + + api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key + for key_name in api_keys: + text_length = pkg.utils.context.get_openai_manager().audit_mgr \ + .get_text_length_of_key(api_keys[key_name]) + image_count = pkg.utils.context.get_openai_manager().audit_mgr \ + .get_image_count_of_key(api_keys[key_name]) + reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length), + int(image_count)) + # 获取此key的额度 + try: + http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None + credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy) + reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted']) + except Exception as e: + logging.warning("获取额度失败:{}".format(e)) + + reply = [reply_str] + return reply + + +@command( + "version", + "查看版本信息", + "!version", + [], + False +) +def cmd_version(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """查看版本信息""" + reply = [] + + reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info()) + try: + if pkg.utils.updater.is_new_version_available(): + reply_str += "\n有新版本可用,请使用命令 !update 进行更新" + except: + pass + + reply = [reply_str] + + return reply + + +@command( + "reload", + "执行热重载", + "!reload", + [], + True +) +def cmd_reload(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """执行热重载""" + import pkg.utils.reloader + def reload_task(): + pkg.utils.reloader.reload_all() + + threading.Thread(target=reload_task, daemon=True).start() + + +@command( + "update", + "更新程序", + "!update", + [], + True +) +def cmd_update(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """更新程序""" + reply = [] + import pkg.utils.updater + import pkg.utils.reloader + import pkg.utils.context + + def update_task(): + try: + if pkg.utils.updater.update_all(): + pkg.utils.reloader.reload_all(notify=False) + pkg.utils.context.get_qqbot_manager().notify_admin("更新完成") + else: + pkg.utils.context.get_qqbot_manager().notify_admin("无新版本") + except Exception as e0: + traceback.print_exc() + pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0)) + return + + threading.Thread(target=update_task, daemon=True).start() + + reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."] + + +def config_operation(cmd, params): + reply = [] + config = pkg.utils.context.get_config() + reply_str = "" + if len(params) == 0: + reply = ["[bot]err:请输入配置项"] + else: + cfg_name = params[0] + if cfg_name == 'all': + reply_str = "[bot]所有配置项:\n\n" + for cfg in dir(config): + if not cfg.startswith('__') and not cfg == 'logging': + # 根据配置项类型进行格式化,如果是字典则转换为json并格式化 + if isinstance(getattr(config, cfg), str): + reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg)) + elif isinstance(getattr(config, cfg), dict): + # 不进行unicode转义,并格式化 + reply_str += "{}: {}\n".format(cfg, + json.dumps(getattr(config, cfg), + ensure_ascii=False, indent=4)) + else: + reply_str += "{}: {}\n".format(cfg, getattr(config, cfg)) + reply = [reply_str] + elif cfg_name in dir(config): + if len(params) == 1: + # 按照配置项类型进行格式化 + if isinstance(getattr(config, cfg_name), str): + reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name)) + elif isinstance(getattr(config, cfg_name), dict): + reply_str = "[bot]配置项{}: {}\n".format(cfg_name, + json.dumps(getattr(config, cfg_name), + ensure_ascii=False, indent=4)) + else: + reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name)) + reply = [reply_str] + else: + cfg_value = " ".join(params[1:]) + # 类型转换,如果是json则转换为字典 + if cfg_value == 'true': + cfg_value = True + elif cfg_value == 'false': + cfg_value = False + elif cfg_value.isdigit(): + cfg_value = int(cfg_value) + elif cfg_value.startswith('{') and cfg_value.endswith('}'): + cfg_value = json.loads(cfg_value) + else: + try: + cfg_value = float(cfg_value) + except ValueError: + pass + + # 检查类型是否匹配 + if isinstance(getattr(config, cfg_name), type(cfg_value)): + setattr(config, cfg_name, cfg_value) + pkg.utils.context.set_config(config) + reply = ["[bot]配置项{}修改成功".format(cfg_name)] + else: + reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)] + + else: + reply = ["[bot]err:未找到配置项 {}".format(cfg_name)] + + return reply + + +@command( + "cfg", + "配置文件相关操作", + "!cfg all\n!cfg <配置项名称>\n!cfg <配置项名称> <配置项新值>", + [], + True +) +def cmd_cfg(cmd: str, params: list, session_name: str, + text_message: str, launcher_type: str, launcher_id: int, + sender_id: int, is_admin: bool) -> list: + """配置文件相关操作""" + reply = config_operation(cmd, params) + return reply diff --git a/pkg/qqbot/command.py b/pkg/qqbot/command.py index b6d7651f..dddc43e1 100644 --- a/pkg/qqbot/command.py +++ b/pkg/qqbot/command.py @@ -13,151 +13,11 @@ import pkg.utils.updater import pkg.utils.context import pkg.qqbot.message import pkg.utils.credit as credit +import pkg.qqbot.cmds.model as cmdmodel from mirai import Image -def config_operation(cmd, params): - reply = [] - config = pkg.utils.context.get_config() - reply_str = "" - if len(params) == 0: - reply = ["[bot]err:请输入配置项"] - else: - cfg_name = params[0] - if cfg_name == 'all': - reply_str = "[bot]所有配置项:\n\n" - for cfg in dir(config): - if not cfg.startswith('__') and not cfg == 'logging': - # 根据配置项类型进行格式化,如果是字典则转换为json并格式化 - if isinstance(getattr(config, cfg), str): - reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg)) - elif isinstance(getattr(config, cfg), dict): - # 不进行unicode转义,并格式化 - reply_str += "{}: {}\n".format(cfg, - json.dumps(getattr(config, cfg), - ensure_ascii=False, indent=4)) - else: - reply_str += "{}: {}\n".format(cfg, getattr(config, cfg)) - reply = [reply_str] - elif cfg_name in dir(config): - if len(params) == 1: - # 按照配置项类型进行格式化 - if isinstance(getattr(config, cfg_name), str): - reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name)) - elif isinstance(getattr(config, cfg_name), dict): - reply_str = "[bot]配置项{}: {}\n".format(cfg_name, - json.dumps(getattr(config, cfg_name), - ensure_ascii=False, indent=4)) - else: - reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name)) - reply = [reply_str] - else: - cfg_value = " ".join(params[1:]) - # 类型转换,如果是json则转换为字典 - if cfg_value == 'true': - cfg_value = True - elif cfg_value == 'false': - cfg_value = False - elif cfg_value.isdigit(): - cfg_value = int(cfg_value) - elif cfg_value.startswith('{') and cfg_value.endswith('}'): - cfg_value = json.loads(cfg_value) - else: - try: - cfg_value = float(cfg_value) - except ValueError: - pass - - # 检查类型是否匹配 - if isinstance(getattr(config, cfg_name), type(cfg_value)): - setattr(config, cfg_name, cfg_value) - pkg.utils.context.set_config(config) - reply = ["[bot]配置项{}修改成功".format(cfg_name)] - else: - reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)] - - else: - reply = ["[bot]err:未找到配置项 {}".format(cfg_name)] - - return reply - - -def plugin_operation(cmd, params, is_admin): - reply = [] - - import pkg.plugin.host as plugin_host - import pkg.utils.updater as updater - - plugin_list = plugin_host.__plugins__ - - if len(params) == 0: - reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__)) - idx = 0 - for key in plugin_host.iter_plugins_name(): - plugin = plugin_list[key] - reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\ - .format((idx+1), plugin['name'], - "[已禁用]" if not plugin['enabled'] else "", - plugin['description'], - plugin['version'], plugin['author']) - - if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): - remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1])) - if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT": - reply_str += "源码: "+remote_url+"\n" - - idx += 1 - - reply = [reply_str] - elif params[0] == 'update': - # 更新所有插件 - if is_admin: - def closure(): - import pkg.utils.context - updated = [] - for key in plugin_list: - plugin = plugin_list[key] - if updater.is_repo("/".join(plugin['path'].split('/')[:-1])): - success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1])) - if success: - updated.append(plugin['name']) - - # 检查是否有requirements.txt - pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...") - for key in plugin_list: - plugin = plugin_list[key] - if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"): - logging.info("{}检测到requirements.txt,安装依赖".format(plugin['name'])) - import pkg.utils.pkgmgr - pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt") - - import main - main.reset_logging() - - pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated))) - - threading.Thread(target=closure).start() - reply = ["[bot]正在更新所有插件,请勿重复发起..."] - else: - reply = ["[bot]err:权限不足"] - elif params[0].startswith("http"): - if is_admin: - - def closure(): - try: - plugin_host.install_plugin(params[0]) - pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件") - except Exception as e: - logging.error("插件安装失败:{}".format(e)) - pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e)) - - threading.Thread(target=closure, args=()).start() - reply = ["[bot]正在安装插件..."] - else: - reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"] - return reply - def process_command(session_name: str, text_message: str, mgr, config, launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: @@ -170,216 +30,30 @@ def process_command(session_name: str, text_message: str, mgr, config, cmd = text_message[1:].strip().split(' ')[0] params = text_message[1:].strip().split(' ')[1:] - if cmd == 'help': - reply = ["[bot]" + config.help_message] - elif cmd == 'reset': - if len(params) == 0: - pkg.openai.session.get_session(session_name).reset(explicit=True) - reply = ["[bot]会话已重置"] - else: - pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0]) - reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])] - elif cmd == 'last': - result = pkg.openai.session.get_session(session_name).last_session() - if result is None: - reply = ["[bot]没有前一次的对话"] - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)] - elif cmd == 'next': - result = pkg.openai.session.get_session(session_name).next_session() - if result is None: - reply = ["[bot]没有后一次的对话"] - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)] - elif cmd == 'prompt': - msgs = "" - session:list = pkg.openai.session.get_session(session_name).prompt - for msg in session: - if len(params) != 0 and params[0] in ['-all', '-a']: - msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content']) - elif len(msg['content']) > 30: - msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30]) - else: - msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content']) - reply = ["[bot]当前对话所有内容:\n{}".format(msgs)] - elif cmd == 'list': - pkg.openai.session.get_session(session_name).persistence() - page = 0 - if len(params) > 0: - try: - page = int(params[0]) - except ValueError: - pass + # 把!~开头的转换成!cfg + if cmd.startswith('~'): + params = [cmd[1:]] + params + cmd = 'cfg' - results = pkg.openai.session.get_session(session_name).list_history(page=page) - if len(results) == 0: - reply = ["[bot]第{}页没有历史会话".format(page)] - else: - reply_str = "[bot]历史会话 第{}页:\n".format(page) - current = -1 - for i in range(len(results)): - # 时间(使用create_timestamp转换) 序号 部分内容 - datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) - msg = "" - try: - msg = json.loads(results[i]['prompt']) - except json.decoder.JSONDecodeError: - msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt']) - # 持久化 - pkg.openai.session.get_session(session_name).persistence() - if len(msg) >= 2: - reply_str += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - msg[0]['content']) - else: - reply_str += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - "无内容") - if results[i]['create_timestamp'] == pkg.openai.session.get_session( - session_name).create_timestamp: - current = i + page * 10 - - reply_str += "\n以上信息倒序排列" - if current != -1: - reply_str += ",当前会话是 #{}\n".format(current) - else: - reply_str += ",当前处于全新会话或不在此页" - - reply = [reply_str] - elif cmd == 'resend': - session = pkg.openai.session.get_session(session_name) - to_send = session.undo() - - reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config, - launcher_type, launcher_id, sender_id) - elif cmd == 'del': # 删除指定会话历史记录 - if len(params) == 0: - reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"] - else: - if params[0] == 'all': - pkg.openai.session.get_session(session_name).delete_all_history() - reply = ["[bot]已删除所有历史会话"] - elif params[0].isdigit(): - if pkg.openai.session.get_session(session_name).delete_history(int(params[0])): - reply = ["[bot]已删除历史会话 #{}".format(params[0])] - else: - reply = ["[bot]没有历史会话 #{}".format(params[0])] - else: - reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"] - elif cmd == 'usage': - reply_str = "[bot]各api-key使用情况:\n\n" - - api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key - for key_name in api_keys: - text_length = pkg.utils.context.get_openai_manager().audit_mgr \ - .get_text_length_of_key(api_keys[key_name]) - image_count = pkg.utils.context.get_openai_manager().audit_mgr \ - .get_image_count_of_key(api_keys[key_name]) - reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length), - int(image_count)) - # 获取此key的额度 - try: - http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None - credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy) - reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted']) - except Exception as e: - logging.warning("获取额度失败:{}".format(e)) - - reply = [reply_str] - elif cmd == 'draw': - if len(params) == 0: - reply = ["[bot]err:请输入图片描述文字"] - else: - session = pkg.openai.session.get_session(session_name) - - res = session.draw_image(" ".join(params)) - - logging.debug("draw_image result:{}".format(res)) - reply = [Image(url=res['data'][0]['url'])] - if not (hasattr(config, 'include_image_description') - and not config.include_image_description): - reply.append(" ".join(params)) - elif cmd == 'version': - reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info()) - try: - if pkg.utils.updater.is_new_version_available(): - reply_str += "\n有新版本可用,请使用命令 !update 进行更新" - except: - pass - - reply = [reply_str] - - elif cmd == 'plugin': - reply = plugin_operation(cmd, params, is_admin) - - elif cmd == 'default': - if len(params) == 0: - # 输出目前所有情景预设 - import pkg.openai.dprompt as dprompt - reply_str = "[bot]当前所有情景预设:\n\n" - for key,value in dprompt.get_prompt_dict().items(): - reply_str += " - {}: {}\n".format(key,value) - - reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current()) - reply_str += "请使用!default <情景预设>来设置默认情景预设" - reply = [reply_str] - elif len(params) >0 and is_admin: - # 设置默认情景 - import pkg.openai.dprompt as dprompt - try: - dprompt.set_current(params[0]) - reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())] - except KeyError: - reply = ["[bot]err: 未找到情景预设:{}".format(params[0])] - else: - reply = ["[bot]err: 仅管理员可设置默认情景预设"] - elif cmd == "delhst" and is_admin: - if len(params) == 0: - reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_, 或使用 !delhst all 删除所有会话的历史记录"] - else: - if params[0] == "all": - pkg.utils.context.get_database_manager().delete_all_session_history() - reply = ["[bot]已删除所有会话的历史记录"] - else: - if pkg.utils.context.get_database_manager().delete_all_history(params[0]): - reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])] - else: - reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])] - elif cmd == 'reload' and is_admin: - def reload_task(): - pkg.utils.reloader.reload_all() - - threading.Thread(target=reload_task, daemon=True).start() - elif cmd == 'update' and is_admin: - def update_task(): - try: - if pkg.utils.updater.update_all(): - pkg.utils.reloader.reload_all(notify=False) - pkg.utils.context.get_qqbot_manager().notify_admin("更新完成") - else: - pkg.utils.context.get_qqbot_manager().notify_admin("无新版本") - except Exception as e0: - traceback.print_exc() - pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0)) - return - - threading.Thread(target=update_task, daemon=True).start() - - reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."] - elif cmd == 'cfg' and is_admin: - reply = config_operation(cmd, params) + # 选择指令处理函数 + cmd_obj = cmdmodel.search(cmd) + if cmd_obj is not None and (cmd_obj['admin_only'] is False or is_admin): + cmd_func = cmd_obj['func'] + reply = cmd_func( + cmd=cmd, + params=params, + session_name=session_name, + text_message=text_message, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=sender_id, + is_admin=is_admin, + ) else: - if cmd.startswith("~") and is_admin: - config_item = cmd[1:] - params = [config_item] + params - reply = config_operation("cfg", params) - else: - reply = ["[bot]err:未知的指令或权限不足: " + cmd] + reply = ["[bot]err:未知的指令或权限不足: " + cmd] + + return reply except Exception as e: mgr.notify_admin("{}指令执行失败:{}".format(session_name, e)) logging.exception(e)