perf: 使用异步流程提高消息处理效率 #18

This commit is contained in:
Rock Chin
2022-12-12 22:04:38 +08:00
parent d6b9994c3b
commit e0ea46e893
2 changed files with 175 additions and 132 deletions

View File

@@ -57,6 +57,22 @@ def get_default_prompt():
config.default_prompt != "" else ''
# def blocked_func(lock: threading.Lock):
#
# def decorator(func):
# def wrapper(*args, **kwargs):
# print('lock acquire,{}'.format(lock))
# lock.acquire()
# try:
# return func(*args, **kwargs)
# finally:
# lock.release()
#
# return wrapper
#
# return decorator
# 通用的OpenAI API交互session
# session内部保留了对话的上下文
# 收到用户消息后将上下文提交给OpenAI API生成回复
@@ -74,6 +90,16 @@ class Session:
just_switched_to_exist_session = False
response_lock = threading.Lock()
# 加锁
def acquire_response_lock(self):
self.response_lock.acquire()
# 释放锁
def release_response_lock(self):
self.response_lock.release()
def __init__(self, name: str):
self.name = name
self.create_timestamp = int(time.time())
@@ -188,6 +214,8 @@ class Session:
self.last_interact_timestamp = int(time.time())
self.just_switched_to_exist_session = False
self.response_lock = threading.Lock()
if schedule_new:
self.schedule()
@@ -207,7 +235,7 @@ class Session:
self.last_interact_timestamp = last_one['last_interact_timestamp']
self.prompt = last_one['prompt']
just_switched = True
self.just_switched_to_exist_session = True
return self
# 切换到下一个session
@@ -222,7 +250,7 @@ class Session:
self.last_interact_timestamp = next_one['last_interact_timestamp']
self.prompt = next_one['prompt']
just_switched = True
self.just_switched_to_exist_session = True
return self
def list_history(self, capacity: int = 10, page: int = 0):

View File

@@ -21,6 +21,12 @@ inst = None
processing = []
# 并行运行
def go(func, args=()):
thread = threading.Thread(target=func, args=args, daemon=True)
thread.start()
# 控制QQ消息输入输出的类
class QQBotManager:
timeout = 60
@@ -54,15 +60,15 @@ class QQBotManager:
@bot.on(FriendMessage)
async def on_friend_message(event: FriendMessage):
return await self.on_person_message(event)
go(self.on_person_message, (event,))
@bot.on(StrangerMessage)
async def on_stranger_message(event: StrangerMessage):
return await self.on_person_message(event)
go(self.on_person_message, (event,))
@bot.on(GroupMessage)
async def on_group_message(event: GroupMessage):
return await self.on_group_message(event)
go(self.on_group_message, (event,))
self.bot = bot
@@ -72,113 +78,132 @@ class QQBotManager:
# 统一的消息处理函数
@func_set_timeout(timeout)
def process_message(self, launcher_type: str, launcher_id: int, text_message: str) -> str:
global processing
reply = ''
session_name = "{}_{}".format(launcher_type, launcher_id)
if text_message.startswith('!') or text_message.startswith(""): # 指令
pkg.openai.session.get_session(session_name).acquire_response_lock()
try:
if session_name in processing:
return "[bot]err:正在处理中,请稍后再试"
processing.append(session_name)
try:
logging.info("[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + (
"..." if len(text_message) > 20 else "")))
cmd = text_message[1:].strip().split(' ')[0]
if text_message.startswith('!') or text_message.startswith(""): # 指令
try:
logging.info("[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))] + (
"..." if len(text_message) > 20 else "")))
params = text_message[1:].strip().split(' ')[1:]
if cmd == 'help':
reply = "[bot]" + help_text
elif cmd == 'reset':
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = "[bot]会话已重置"
elif cmd == 'last':
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = "[bot]没有前一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[
:min(100,
len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'next':
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = "[bot]没有后一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[
:min(100,
len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'prompt':
reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt
elif cmd == 'list':
pkg.openai.session.get_session(session_name).persistence()
page = 0
cmd = text_message[1:].strip().split(' ')[0]
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
params = text_message[1:].strip().split(' ')[1:]
if cmd == 'help':
reply = "[bot]" + help_text
elif cmd == 'reset':
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = "[bot]会话已重置"
elif cmd == 'last':
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = "[bot]没有前一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[
:min(100,
len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'next':
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = "[bot]没有后一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[
:min(100,
len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'prompt':
reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt
elif cmd == 'list':
pkg.openai.session.get_session(session_name).persistence()
page = 0
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = "[bot]第{}页没有历史会话".format(page)
else:
reply = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
reply += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
results[i]['prompt'][
:min(20, len(results[i]['prompt']))])
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
reply += "\n以上信息倒序排列"
if current != -1:
reply += ",当前会话是 #{}\n".format(current)
else:
reply += ",当前处于全新会话或不在此页"
except Exception as e:
self.notify_admin("{}指令执行失败:{}".format(session_name, e))
logging.exception(e)
reply = "[bot]err:{}".format(e)
else: # 消息
logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + (
"..." if len(text_message) > 20 else "")))
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = "[bot]第{}页没有历史会话".format(page)
else:
reply = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
reply += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
results[i]['prompt'][
:min(20, len(results[i]['prompt']))])
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
session = pkg.openai.session.get_session(session_name)
try:
reply = "[GPT]" + session.append(text_message)
except openai.error.APIConnectionError as e:
self.notify_admin("{}会话调用API失败:{}".format(session_name, e))
reply = "[bot]err:调用API失败请联系作者或等待修复"
except openai.error.RateLimitError as e:
self.notify_admin("API调用额度超限,请向OpenAI账户充值或在config.py中更换api_key")
reply = "[bot]err:API调用额度超额请联系作者或等待修复"
except openai.error.InvalidRequestError as e:
self.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或"
"completion_api_params中的max_tokens参数数值过大导致的请尝试将其降低".format(session_name, e))
reply = "[bot]err:API调用参数错误请联系作者或等待修复"
except Exception as e:
logging.exception(e)
reply = "[bot]err:{}".format(e)
reply += "\n以上信息倒序排列"
if current != -1:
reply += ",当前会话是 #{}\n".format(current)
else:
reply += ",当前处于全新会话或不在此页"
except Exception as e:
self.notify_admin("{}指令执行失败:{}".format(session_name, e))
logging.exception(e)
reply = "[bot]err:{}".format(e)
else: # 消息
logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))] + (
"..." if len(text_message) > 20 else "")))
session = pkg.openai.session.get_session(session_name)
try:
reply = "[GPT]" + session.append(text_message)
except openai.error.APIConnectionError as e:
self.notify_admin("{}会话调用API失败:{}".format(session_name, e))
reply = "[bot]err:调用API失败请联系作者或等待修复"
except openai.error.RateLimitError as e:
self.notify_admin("API调用额度超限,请向OpenAI账户充值或在config.py中更换api_key")
reply = "[bot]err:API调用额度超额请联系作者或等待修复"
except openai.error.InvalidRequestError as e:
self.notify_admin("{}API调用参数错误:{}\n\n这可能是由于config.py中的prompt_submit_length参数或"
"completion_api_params中的max_tokens参数数值过大导致的请尝试将其降低".format(
session_name, e))
reply = "[bot]err:API调用参数错误请联系作者或等待修复"
except Exception as e:
logging.exception(e)
reply = "[bot]err:{}".format(e)
logging.info(
"回复[{}]消息:{}".format(session_name, reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else "")))
reply = self.reply_filter.process(reply)
finally:
processing.remove(session_name)
finally:
pkg.openai.session.get_session(session_name).release_response_lock()
logging.info(
"回复[{}]消息:{}".format(session_name, reply[:min(100, len(reply))] + ("..." if len(reply) > 100 else "")))
reply = self.reply_filter.process(reply)
return reply
def send(self, event, msg):
asyncio.run(self.bot.send(event, msg))
# 私聊消息处理
async def on_person_message(self, event: MessageEvent):
def on_person_message(self, event: MessageEvent):
global processing
if "person_{}".format(event.sender.id) in processing:
return await self.bot.send(event, "err:正在处理中,请稍后再试")
reply = ''
@@ -188,32 +213,25 @@ class QQBotManager:
if Image in event.message_chain:
pass
else:
processing.append("person_{}".format(event.sender.id))
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('person', event.sender.id, str(event.message_chain))
break
except FunctionTimedOut:
failed += 1
continue
try:
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('person', event.sender.id, str(event.message_chain))
break
except FunctionTimedOut:
failed += 1
continue
if failed == self.retry:
reply = "[bot]err:请求超时"
finally:
processing.remove("person_{}".format(event.sender.id))
if failed == self.retry:
reply = "[bot]err:请求超时"
if reply != '':
return await self.bot.send(event, reply)
return self.send(event, reply)
# 群消息处理
async def on_group_message(self, event: GroupMessage):
def on_group_message(self, event: GroupMessage):
global processing
if "group_{}".format(event.group.id) in processing:
return await self.bot.send(event, "err:正在处理中,请稍后再试")
reply = ''
@@ -226,24 +244,21 @@ class QQBotManager:
processing.append("group_{}".format(event.sender.id))
try:
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('group', event.group.id, str(event.message_chain).strip())
break
except FunctionTimedOut:
failed += 1
continue
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('group', event.group.id, str(event.message_chain).strip())
break
except FunctionTimedOut:
failed += 1
continue
if failed == self.retry:
reply = "err:请求超时"
finally:
processing.remove("group_{}".format(event.sender.id))
if failed == self.retry:
reply = "err:请求超时"
if reply != '':
return await self.bot.send(event, reply)
return self.send(event, reply)
# 通知系统管理员
def notify_admin(self, message: str):