From e0ea46e893b60af8045f15284a19f319cdc1a1f1 Mon Sep 17 00:00:00 2001 From: Rock Chin Date: Mon, 12 Dec 2022 22:04:38 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BD=BF=E7=94=A8=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=8F=90=E9=AB=98=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=95=88=E7=8E=87=20#18?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/openai/session.py | 32 ++++- pkg/qqbot/manager.py | 275 ++++++++++++++++++++++-------------------- 2 files changed, 175 insertions(+), 132 deletions(-) diff --git a/pkg/openai/session.py b/pkg/openai/session.py index 96d8bd6d..9f988210 100644 --- a/pkg/openai/session.py +++ b/pkg/openai/session.py @@ -57,6 +57,22 @@ def get_default_prompt(): config.default_prompt != "" else '' +# def blocked_func(lock: threading.Lock): +# +# def decorator(func): +# def wrapper(*args, **kwargs): +# print('lock acquire,{}'.format(lock)) +# lock.acquire() +# try: +# return func(*args, **kwargs) +# finally: +# lock.release() +# +# return wrapper +# +# return decorator + + # 通用的OpenAI API交互session # session内部保留了对话的上下文, # 收到用户消息后,将上下文提交给OpenAI API生成回复 @@ -74,6 +90,16 @@ class Session: just_switched_to_exist_session = False + response_lock = threading.Lock() + + # 加锁 + def acquire_response_lock(self): + self.response_lock.acquire() + + # 释放锁 + def release_response_lock(self): + self.response_lock.release() + def __init__(self, name: str): self.name = name self.create_timestamp = int(time.time()) @@ -188,6 +214,8 @@ class Session: self.last_interact_timestamp = int(time.time()) self.just_switched_to_exist_session = False + self.response_lock = threading.Lock() + if schedule_new: self.schedule() @@ -207,7 +235,7 @@ class Session: self.last_interact_timestamp = last_one['last_interact_timestamp'] self.prompt = last_one['prompt'] - just_switched = True + self.just_switched_to_exist_session = True return self # 切换到下一个session @@ -222,7 +250,7 @@ class Session: self.last_interact_timestamp = next_one['last_interact_timestamp'] self.prompt = next_one['prompt'] - just_switched = True + self.just_switched_to_exist_session = True return self def list_history(self, capacity: int = 10, page: int = 0): diff --git a/pkg/qqbot/manager.py b/pkg/qqbot/manager.py index e9ee62e3..479866e3 100644 --- a/pkg/qqbot/manager.py +++ b/pkg/qqbot/manager.py @@ -21,6 +21,12 @@ inst = None processing = [] +# 并行运行 +def go(func, args=()): + thread = threading.Thread(target=func, args=args, daemon=True) + thread.start() + + # 控制QQ消息输入输出的类 class QQBotManager: timeout = 60 @@ -54,15 +60,15 @@ class QQBotManager: @bot.on(FriendMessage) async def on_friend_message(event: FriendMessage): - return await self.on_person_message(event) + go(self.on_person_message, (event,)) @bot.on(StrangerMessage) async def on_stranger_message(event: StrangerMessage): - return await self.on_person_message(event) + go(self.on_person_message, (event,)) @bot.on(GroupMessage) async def on_group_message(event: GroupMessage): - return await self.on_group_message(event) + go(self.on_group_message, (event,)) self.bot = bot @@ -72,113 +78,132 @@ class QQBotManager: # 统一的消息处理函数 @func_set_timeout(timeout) def process_message(self, launcher_type: str, launcher_id: int, text_message: str) -> str: + global processing reply = '' session_name = "{}_{}".format(launcher_type, launcher_id) - if text_message.startswith('!') or text_message.startswith("!"): # 指令 + pkg.openai.session.get_session(session_name).acquire_response_lock() + + try: + if session_name in processing: + return "[bot]err:正在处理中,请稍后再试" + + processing.append(session_name) + try: - logging.info("[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + ( - "..." if len(text_message) > 20 else ""))) - cmd = text_message[1:].strip().split(' ')[0] + if text_message.startswith('!') or text_message.startswith("!"): # 指令 + try: + logging.info("[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + ( + "..." if len(text_message) > 20 else ""))) - params = text_message[1:].strip().split(' ')[1:] - if cmd == 'help': - reply = "[bot]" + help_text - elif cmd == 'reset': - pkg.openai.session.get_session(session_name).reset(explicit=True) - reply = "[bot]会话已重置" - elif cmd == 'last': - result = pkg.openai.session.get_session(session_name).last_session() - if result is None: - reply = "[bot]没有前一次的对话" - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[ - :min(100, - len(result.prompt))] + \ - ("..." if len(result.prompt) > 100 else "#END#") - elif cmd == 'next': - result = pkg.openai.session.get_session(session_name).next_session() - if result is None: - reply = "[bot]没有后一次的对话" - else: - datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( - '%Y-%m-%d %H:%M:%S') - reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[ - :min(100, - len(result.prompt))] + \ - ("..." if len(result.prompt) > 100 else "#END#") - elif cmd == 'prompt': - reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt - elif cmd == 'list': - pkg.openai.session.get_session(session_name).persistence() - page = 0 + cmd = text_message[1:].strip().split(' ')[0] - if len(params) > 0: - try: - page = int(params[0]) - except ValueError: - pass + params = text_message[1:].strip().split(' ')[1:] + if cmd == 'help': + reply = "[bot]" + help_text + elif cmd == 'reset': + pkg.openai.session.get_session(session_name).reset(explicit=True) + reply = "[bot]会话已重置" + elif cmd == 'last': + result = pkg.openai.session.get_session(session_name).last_session() + if result is None: + reply = "[bot]没有前一次的对话" + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[ + :min(100, + len(result.prompt))] + \ + ("..." if len(result.prompt) > 100 else "#END#") + elif cmd == 'next': + result = pkg.openai.session.get_session(session_name).next_session() + if result is None: + reply = "[bot]没有后一次的对话" + else: + datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime( + '%Y-%m-%d %H:%M:%S') + reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[ + :min(100, + len(result.prompt))] + \ + ("..." if len(result.prompt) > 100 else "#END#") + elif cmd == 'prompt': + reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt + elif cmd == 'list': + pkg.openai.session.get_session(session_name).persistence() + page = 0 - results = pkg.openai.session.get_session(session_name).list_history(page=page) - if len(results) == 0: - reply = "[bot]第{}页没有历史会话".format(page) - else: - reply = "[bot]历史会话 第{}页:\n".format(page) - current = -1 - for i in range(len(results)): - # 时间(使用create_timestamp转换) 序号 部分内容 - datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) - reply += "#{} 创建:{} {}\n".format(i + page * 10, - datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), - results[i]['prompt'][ - :min(20, len(results[i]['prompt']))]) - if results[i]['create_timestamp'] == pkg.openai.session.get_session( - session_name).create_timestamp: - current = i + page * 10 + if len(params) > 0: + try: + page = int(params[0]) + except ValueError: + pass - reply += "\n以上信息倒序排列" - if current != -1: - reply += ",当前会话是 #{}\n".format(current) - else: - reply += ",当前处于全新会话或不在此页" - except Exception as e: - self.notify_admin("{}指令执行失败:{}".format(session_name, e)) - logging.exception(e) - reply = "[bot]err:{}".format(e) - else: # 消息 - logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + ( - "..." if len(text_message) > 20 else ""))) + results = pkg.openai.session.get_session(session_name).list_history(page=page) + if len(results) == 0: + reply = "[bot]第{}页没有历史会话".format(page) + else: + reply = "[bot]历史会话 第{}页:\n".format(page) + current = -1 + for i in range(len(results)): + # 时间(使用create_timestamp转换) 序号 部分内容 + datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) + reply += "#{} 创建:{} {}\n".format(i + page * 10, + datetime_obj.strftime("%Y-%m-%d %H:%M:%S"), + results[i]['prompt'][ + :min(20, len(results[i]['prompt']))]) + if results[i]['create_timestamp'] == pkg.openai.session.get_session( + session_name).create_timestamp: + current = i + page * 10 - session = pkg.openai.session.get_session(session_name) - try: - reply = "[GPT]" + session.append(text_message) - except openai.error.APIConnectionError as e: - self.notify_admin("{}会话调用API失败:{}".format(session_name, e)) - reply = "[bot]err:调用API失败,请联系作者,或等待修复" - except openai.error.RateLimitError as e: - self.notify_admin("API调用额度超限,请向OpenAI账户充值或在config.py中更换api_key") - reply = "[bot]err:API调用额度超额,请联系作者,或等待修复" - except openai.error.InvalidRequestError as e: - self.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或" - "completion_api_params中的max_tokens参数数值过大导致的,请尝试将其降低".format(session_name, e)) - reply = "[bot]err:API调用参数错误,请联系作者,或等待修复" - except Exception as e: - logging.exception(e) - reply = "[bot]err:{}".format(e) + reply += "\n以上信息倒序排列" + if current != -1: + reply += ",当前会话是 #{}\n".format(current) + else: + reply += ",当前处于全新会话或不在此页" + except Exception as e: + self.notify_admin("{}指令执行失败:{}".format(session_name, e)) + logging.exception(e) + reply = "[bot]err:{}".format(e) + else: # 消息 + logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + ( + "..." if len(text_message) > 20 else ""))) + + session = pkg.openai.session.get_session(session_name) + try: + reply = "[GPT]" + session.append(text_message) + except openai.error.APIConnectionError as e: + self.notify_admin("{}会话调用API失败:{}".format(session_name, e)) + reply = "[bot]err:调用API失败,请联系作者,或等待修复" + except openai.error.RateLimitError as e: + self.notify_admin("API调用额度超限,请向OpenAI账户充值或在config.py中更换api_key") + reply = "[bot]err:API调用额度超额,请联系作者,或等待修复" + except openai.error.InvalidRequestError as e: + self.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或" + "completion_api_params中的max_tokens参数数值过大导致的,请尝试将其降低".format( + session_name, e)) + reply = "[bot]err:API调用参数错误,请联系作者,或等待修复" + except Exception as e: + logging.exception(e) + reply = "[bot]err:{}".format(e) + + logging.info( + "回复[{}]消息:{}".format(session_name, reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else ""))) + reply = self.reply_filter.process(reply) + + finally: + processing.remove(session_name) + finally: + pkg.openai.session.get_session(session_name).release_response_lock() - logging.info( - "回复[{}]消息:{}".format(session_name, reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else ""))) - reply = self.reply_filter.process(reply) return reply + def send(self, event, msg): + asyncio.run(self.bot.send(event, msg)) + # 私聊消息处理 - async def on_person_message(self, event: MessageEvent): + def on_person_message(self, event: MessageEvent): global processing - if "person_{}".format(event.sender.id) in processing: - return await self.bot.send(event, "err:正在处理中,请稍后再试") reply = '' @@ -188,32 +213,25 @@ class QQBotManager: if Image in event.message_chain: pass else: - processing.append("person_{}".format(event.sender.id)) + # 超时则重试,重试超过次数则放弃 + failed = 0 + for i in range(self.retry): + try: + reply = self.process_message('person', event.sender.id, str(event.message_chain)) + break + except FunctionTimedOut: + failed += 1 + continue - try: - # 超时则重试,重试超过次数则放弃 - failed = 0 - for i in range(self.retry): - try: - reply = self.process_message('person', event.sender.id, str(event.message_chain)) - break - except FunctionTimedOut: - failed += 1 - continue - - if failed == self.retry: - reply = "[bot]err:请求超时" - finally: - processing.remove("person_{}".format(event.sender.id)) + if failed == self.retry: + reply = "[bot]err:请求超时" if reply != '': - return await self.bot.send(event, reply) + return self.send(event, reply) # 群消息处理 - async def on_group_message(self, event: GroupMessage): + def on_group_message(self, event: GroupMessage): global processing - if "group_{}".format(event.group.id) in processing: - return await self.bot.send(event, "err:正在处理中,请稍后再试") reply = '' @@ -226,24 +244,21 @@ class QQBotManager: processing.append("group_{}".format(event.sender.id)) - try: - # 超时则重试,重试超过次数则放弃 - failed = 0 - for i in range(self.retry): - try: - reply = self.process_message('group', event.group.id, str(event.message_chain).strip()) - break - except FunctionTimedOut: - failed += 1 - continue + # 超时则重试,重试超过次数则放弃 + failed = 0 + for i in range(self.retry): + try: + reply = self.process_message('group', event.group.id, str(event.message_chain).strip()) + break + except FunctionTimedOut: + failed += 1 + continue - if failed == self.retry: - reply = "err:请求超时" - finally: - processing.remove("group_{}".format(event.sender.id)) + if failed == self.retry: + reply = "err:请求超时" if reply != '': - return await self.bot.send(event, reply) + return self.send(event, reply) # 通知系统管理员 def notify_admin(self, message: str):