diff --git a/pkg/qqbot/manager.py b/pkg/qqbot/manager.py index b1fdcd14..ed362705 100644 --- a/pkg/qqbot/manager.py +++ b/pkg/qqbot/manager.py @@ -10,18 +10,14 @@ from mirai import At, GroupMessage, MessageEvent, Mirai, Plain, StrangerMessage, import config import pkg.openai.session import pkg.openai.manager -from func_timeout import func_set_timeout, FunctionTimedOut -import datetime +from func_timeout import FunctionTimedOut import logging import pkg.qqbot.filter - -help_text = config.help_message +import pkg.qqbot.process as processor inst = None -processing = [] - # 并行运行 def go(func, args=()): @@ -54,7 +50,6 @@ def check_response_rule(text: str) -> (bool, str): # 控制QQ消息输入输出的类 class QQBotManager: - timeout = 60 retry = 3 bot = None @@ -93,6 +88,9 @@ class QQBotManager: ) ) + else: + raise Exception("未知的适配器类型") + @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): go(self.on_person_message, (event,)) @@ -110,163 +108,6 @@ class QQBotManager: global inst inst = self - # 统一的消息处理函数 - @func_set_timeout(timeout) - def process_message(self, launcher_type: str, launcher_id: int, text_message: str) -> str: - global processing - reply = '' - session_name = "{}_{}".format(launcher_type, launcher_id) - - pkg.openai.session.get_session(session_name).acquire_response_lock() - - try: - if session_name in processing: - pkg.openai.session.get_session(session_name).release_response_lock() - return "[bot]err:正在处理中,请稍后再试" - - processing.append(session_name) - - try: - - if text_message.startswith('!') or text_message.startswith("!"): # 指令 - try: - logging.info( - "[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + ( - "..." if len(text_message) > 20 else ""))) - - cmd = text_message[1:].strip().split(' ')[0] - - params = text_message[1:].strip().split(' ')[1:] - if cmd == 'help': - reply = "[bot]" + help_text - elif cmd == 'reset': - pkg.openai.session.get_session(session_name).reset(explicit=True) - reply = "[bot]会话已重置" - elif cmd == 'last': - result = pkg.openai.session.get_session(session_name).last_session() - if result is None: - reply = "[bot]没有前一次的对话" - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format( - datetime_str) + result.prompt[ - :min(100, - len(result.prompt))] + \ - ("..." if len(result.prompt) > 100 else "#END#") - elif cmd == 'next': - result = pkg.openai.session.get_session(session_name).next_session() - if result is None: - reply = "[bot]没有后一次的对话" - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format( - datetime_str) + result.prompt[ - :min(100, - len(result.prompt))] + \ - ("..." if len(result.prompt) > 100 else "#END#") - elif cmd == 'prompt': - reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt - elif cmd == 'list': - pkg.openai.session.get_session(session_name).persistence() - page = 0 - - if len(params) > 0: - try: - page = int(params[0]) - except ValueError: - pass - - results = pkg.openai.session.get_session(session_name).list_history(page=page) - if len(results) == 0: - reply = "[bot]第{}页没有历史会话".format(page) - else: - reply = "[bot]历史会话 第{}页:\n".format(page) - current = -1 - for i in range(len(results)): - # 时间(使用create_timestamp转换) 序号 部分内容 - datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) - reply += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - results[i]['prompt'][ - :min(20, len(results[i]['prompt']))]) - if results[i]['create_timestamp'] == pkg.openai.session.get_session( - session_name).create_timestamp: - current = i + page * 10 - - reply += "\n以上信息倒序排列" - if current != -1: - reply += ",当前会话是 #{}\n".format(current) - else: - reply += ",当前处于全新会话或不在此页" - elif cmd == 'usage': - api_keys = pkg.openai.manager.get_inst().key_mgr.api_key - reply = "[bot]api-key使用情况:(阈值:{})\n\n".format( - pkg.openai.manager.get_inst().key_mgr.api_key_usage_threshold) - - using_key_name = "" - for api_key in api_keys: - reply += "{}:\n - {}字 {}%\n".format(api_key, - pkg.openai.manager.get_inst().key_mgr.get_usage( - api_keys[api_key]), - round( - pkg.openai.manager.get_inst().key_mgr.get_usage( - api_keys[ - api_key]) / pkg.openai.manager.get_inst().key_mgr.api_key_usage_threshold * 100, - 3)) - if api_keys[api_key] == pkg.openai.manager.get_inst().key_mgr.using_key: - using_key_name = api_key - reply += "\n当前使用:{}".format(using_key_name) - except Exception as e: - self.notify_admin("{}指令执行失败:{}".format(session_name, e)) - logging.exception(e) - reply = "[bot]err:{}".format(e) - else: # 消息 - logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + ( - "..." if len(text_message) > 20 else ""))) - - session = pkg.openai.session.get_session(session_name) - try: - prefix = "[GPT]" if hasattr(config, "show_prefix") and config.show_prefix else "" - reply = prefix + session.append(text_message) - except openai.error.APIConnectionError as e: - self.notify_admin("{}会话调用API失败:{}".format(session_name, e)) - reply = "[bot]err:调用API失败,请重试或联系作者,或等待修复" - except openai.error.RateLimitError as e: - # 尝试切换api-key - current_tokens_amt = pkg.openai.manager.get_inst().key_mgr.get_usage(pkg.openai.manager.get_inst().key_mgr.get_using_key()) - pkg.openai.manager.get_inst().key_mgr.set_current_exceeded() - switched, name = pkg.openai.manager.get_inst().key_mgr.auto_switch() - - if not switched: - self.notify_admin("API调用额度超限({}),请向OpenAI账户充值或在config.py中更换api_key".format(current_tokens_amt)) - reply = "[bot]err:API调用额度超额,请联系作者,或等待修复" - else: - openai.api_key = pkg.openai.manager.get_inst().key_mgr.get_using_key() - self.notify_admin("API调用额度超限({}),已切换到{}".format(current_tokens_amt, name)) - reply = "[bot]err:API调用额度超额,已自动切换,请重新发送消息" - except openai.error.InvalidRequestError as e: - self.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或" - "completion_api_params中的max_tokens参数数值过大导致的,请尝试将其降低".format( - session_name, e)) - reply = "[bot]err:API调用参数错误,请联系作者,或等待修复" - except Exception as e: - logging.exception(e) - reply = "[bot]err:{}".format(e) - - logging.info( - "回复[{}]消息:{}".format(session_name, - reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else ""))) - reply = self.reply_filter.process(reply) - - finally: - processing.remove(session_name) - finally: - pkg.openai.session.get_session(session_name).release_response_lock() - - return reply - def send(self, event, msg): asyncio.run(self.bot.send(event, msg)) @@ -286,7 +127,7 @@ class QQBotManager: failed = 0 for i in range(self.retry): try: - reply = self.process_message('person', event.sender.id, str(event.message_chain)) + reply = processor.process_message('person', event.sender.id, str(event.message_chain)) break except FunctionTimedOut: pkg.openai.session.get_session('person_{}'.format(event.sender.id)).release_response_lock() @@ -312,13 +153,11 @@ class QQBotManager: if At(self.bot.qq) in event.message_chain: event.message_chain.remove(At(self.bot.qq)) - processing.append("group_{}".format(event.sender.id)) - # 超时则重试,重试超过次数则放弃 failed = 0 for i in range(self.retry): try: - replys = self.process_message('group', event.group.id, + replys = processor.process_message('group', event.group.id, str(event.message_chain).strip() if text is None else text) break except FunctionTimedOut: diff --git a/pkg/qqbot/process.py b/pkg/qqbot/process.py new file mode 100644 index 00000000..82cf0f5c --- /dev/null +++ b/pkg/qqbot/process.py @@ -0,0 +1,176 @@ +# 此模块提供了消息处理的具体逻辑的接口 +import datetime + +import pkg.qqbot.manager as manager +from func_timeout import func_set_timeout +import logging +import openai + +import config + +import pkg.openai.session +import pkg.openai.manager + +processing = [] + + +@func_set_timeout(config.process_message_timeout) +def process_message(launcher_type: str, launcher_id: int, text_message: str) -> str: + global processing + + mgr = pkg.openai.manager.get_inst() + + reply = '' + session_name = "{}_{}".format(launcher_type, launcher_id) + + pkg.openai.session.get_session(session_name).acquire_response_lock() + + try: + if session_name in processing: + pkg.openai.session.get_session(session_name).release_response_lock() + return "[bot]err:正在处理中,请稍后再试" + + processing.append(session_name) + + try: + + if text_message.startswith('!') or text_message.startswith("!"): # 指令 + try: + logging.info( + "[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + ( + "..." if len(text_message) > 20 else ""))) + + cmd = text_message[1:].strip().split(' ')[0] + + params = text_message[1:].strip().split(' ')[1:] + if cmd == 'help': + reply = "[bot]" + config.help_message + elif cmd == 'reset': + pkg.openai.session.get_session(session_name).reset(explicit=True) + reply = "[bot]会话已重置" + elif cmd == 'last': + result = pkg.openai.session.get_session(session_name).last_session() + if result is None: + reply = "[bot]没有前一次的对话" + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format( + datetime_str) + result.prompt[ + :min(100, + len(result.prompt))] + \ + ("..." if len(result.prompt) > 100 else "#END#") + elif cmd == 'next': + result = pkg.openai.session.get_session(session_name).next_session() + if result is None: + reply = "[bot]没有后一次的对话" + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format( + datetime_str) + result.prompt[ + :min(100, + len(result.prompt))] + \ + ("..." if len(result.prompt) > 100 else "#END#") + elif cmd == 'prompt': + reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt + elif cmd == 'list': + pkg.openai.session.get_session(session_name).persistence() + page = 0 + + if len(params) > 0: + try: + page = int(params[0]) + except ValueError: + pass + + results = pkg.openai.session.get_session(session_name).list_history(page=page) + if len(results) == 0: + reply = "[bot]第{}页没有历史会话".format(page) + else: + reply = "[bot]历史会话 第{}页:\n".format(page) + current = -1 + for i in range(len(results)): + # 时间(使用create_timestamp转换) 序号 部分内容 + datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) + reply += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + results[i]['prompt'][ + :min(20, len(results[i]['prompt']))]) + if results[i]['create_timestamp'] == pkg.openai.session.get_session( + session_name).create_timestamp: + current = i + page * 10 + + reply += "\n以上信息倒序排列" + if current != -1: + reply += ",当前会话是 #{}\n".format(current) + else: + reply += ",当前处于全新会话或不在此页" + elif cmd == 'usage': + api_keys = pkg.openai.manager.get_inst().key_mgr.api_key + reply = "[bot]api-key使用情况:(阈值:{})\n\n".format( + pkg.openai.manager.get_inst().key_mgr.api_key_usage_threshold) + + using_key_name = "" + for api_key in api_keys: + reply += "{}:\n - {}字 {}%\n".format(api_key, + pkg.openai.manager.get_inst().key_mgr.get_usage( + api_keys[api_key]), + round( + pkg.openai.manager.get_inst().key_mgr.get_usage( + api_keys[ + api_key]) / pkg.openai.manager.get_inst().key_mgr.api_key_usage_threshold * 100, + 3)) + if api_keys[api_key] == pkg.openai.manager.get_inst().key_mgr.using_key: + using_key_name = api_key + reply += "\n当前使用:{}".format(using_key_name) + except Exception as e: + mgr.notify_admin("{}指令执行失败:{}".format(session_name, e)) + logging.exception(e) + reply = "[bot]err:{}".format(e) + else: # 消息 + logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + ( + "..." if len(text_message) > 20 else ""))) + + session = pkg.openai.session.get_session(session_name) + try: + prefix = "[GPT]" if hasattr(config, "show_prefix") and config.show_prefix else "" + reply = prefix + session.append(text_message) + except openai.error.APIConnectionError as e: + mgr.notify_admin("{}会话调用API失败:{}".format(session_name, e)) + reply = "[bot]err:调用API失败,请重试或联系作者,或等待修复" + except openai.error.RateLimitError as e: + # 尝试切换api-key + current_tokens_amt = pkg.openai.manager.get_inst().key_mgr.get_usage( + pkg.openai.manager.get_inst().key_mgr.get_using_key()) + pkg.openai.manager.get_inst().key_mgr.set_current_exceeded() + switched, name = pkg.openai.manager.get_inst().key_mgr.auto_switch() + + if not switched: + mgr.notify_admin("API调用额度超限({}),请向OpenAI账户充值或在config.py中更换api_key".format( + current_tokens_amt)) + reply = "[bot]err:API调用额度超额,请联系作者,或等待修复" + else: + openai.api_key = pkg.openai.manager.get_inst().key_mgr.get_using_key() + mgr.notify_admin("API调用额度超限({}),已切换到{}".format(current_tokens_amt, name)) + reply = "[bot]err:API调用额度超额,已自动切换,请重新发送消息" + except openai.error.InvalidRequestError as e: + mgr.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或" + "completion_api_params中的max_tokens参数数值过大导致的,请尝试将其降低".format( + session_name, e)) + reply = "[bot]err:API调用参数错误,请联系作者,或等待修复" + except Exception as e: + logging.exception(e) + reply = "[bot]err:{}".format(e) + + logging.info( + "回复[{}]消息:{}".format(session_name, + reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else ""))) + reply = mgr.reply_filter.process(reply) + + finally: + processing.remove(session_name) + finally: + pkg.openai.session.get_session(session_name).release_response_lock() + + return reply