Files
LangBot/pkg/qqbot/manager.py
2022-12-09 16:25:00 +08:00

217 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from mirai import At, GroupMessage, MessageEvent, Mirai, Plain, StrangerMessage, WebSocketAdapter, FriendMessage, Image
import config
import pkg.openai.session
from func_timeout import func_set_timeout, FunctionTimedOut
import datetime
import logging
help_text = """此机器人通过调用OpenAI的GPT-3大型语言模型生成回复不具有情感。
你可以用自然语言与其交流,回复的消息中[GPT]开头的为模型生成的语言,[bot]开头的为程序提示。
你可以通过QQ 1010553892 联系作者
请不要用其生成整篇文章,因为每次只会向模型提交少部分文字,生成文章可能会导致偏题、前后矛盾等问题
每次会话最后一次交互后{}分钟后会自动结束,结束后将开启新会话,如需继续前一次会话请发送 !last 重新开启
欢迎到github.com/RockChinQ/QChatGPT 给个star
帮助信息:
!help - 显示帮助
!reset - 重置会话
!last - 切换到前一次的对话
!next - 切换到后一次的对话
!prompt - 显示当前对话所有内容
!list - 列出所有历史会话""".format(config.session_expire_time // 60)
inst = None
processing = []
class QQBotManager:
timeout = 60
retry = 3
bot = None
def __init__(self, mirai_http_api_config: dict, timeout: int = 60, retry: int = 3):
self.timeout = timeout
self.retry = retry
bot = Mirai(
qq=mirai_http_api_config['qq'],
adapter=WebSocketAdapter(
verify_key=mirai_http_api_config['verifyKey'],
host=mirai_http_api_config['host'],
port=mirai_http_api_config['port']
)
)
@bot.on(FriendMessage)
async def on_friend_message(event: FriendMessage):
return await self.on_person_message(event)
@bot.on(StrangerMessage)
async def on_stranger_message(event: StrangerMessage):
return await self.on_person_message(event)
@bot.on(GroupMessage)
async def on_group_message(event: GroupMessage):
return await self.on_group_message(event)
self.bot = bot
global inst
inst = self
# 统一的消息处理函数
@func_set_timeout(timeout)
def process_message(self, launcher_type: str, launcher_id: int, text_message: str) -> str:
reply = ''
session_name = "{}_{}".format(launcher_type, launcher_id)
if text_message.startswith('!') or text_message.startswith(""): # 指令
logging.info("[{}]发起指令:{}".format(session_name, text_message[:min(20, len(text_message))]+("..." if len(text_message) > 20 else "")))
cmd = text_message[1:].strip().split(' ')[0]
params = text_message[1:].strip().split(' ')[1:]
if cmd == 'help':
reply = "[bot]" + help_text
elif cmd == 'reset':
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = "[bot]会话已重置"
elif cmd == 'last':
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = "[bot]没有前一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime('%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[:min(100, len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'next':
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = "[bot]没有后一次的对话"
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime('%Y-%m-%d %H:%M:%S')
reply = "[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str) + result.prompt[:min(100, len(result.prompt))] + \
("..." if len(result.prompt) > 100 else "#END#")
elif cmd == 'prompt':
reply = "[bot]当前对话所有内容:\n" + pkg.openai.session.get_session(session_name).prompt
elif cmd == 'list':
pkg.openai.session.get_session(session_name).persistence()
page = 0
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = "[bot]第{}页没有历史会话".format(page)
else:
reply = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
reply += "#{} 创建:{} {}\n".format(i + page * 10, datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
results[i]['prompt'][:min(20, len(results[i]['prompt']))])
if results[i]['create_timestamp'] == pkg.openai.session.get_session(session_name).create_timestamp:
current = i + page * 10
reply += "\n以上信息倒序排列"
if current != -1:
reply += ",当前会话是 #{}\n".format(current)
else:
reply += ",当前处于全新会话或不在此页"
else: # 消息
logging.info("[{}]发送消息:{}".format(session_name, text_message[:min(20, len(text_message))]+("..." if len(text_message) > 20 else "")))
session = pkg.openai.session.get_session(session_name)
try:
reply = "[GPT]" + session.append(text_message)
except Exception as e:
logging.exception(e)
reply = "[bot]err:{}".format(e)
logging.info("回复[{}]消息:{}".format(session_name, reply[:min(100, len(reply))]+("..." if len(reply) > 100 else "")))
return reply
async def on_person_message(self, event: MessageEvent):
global processing
if "person_{}".format(event.sender.id) in processing:
return await self.bot.send(event, "err:正在处理中,请稍后再试")
reply = ''
if event.sender.id == self.bot.qq:
pass
else:
if Image in event.message_chain:
pass
else:
processing.append("person_{}".format(event.sender.id))
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('person', event.sender.id, str(event.message_chain))
break
except FunctionTimedOut:
failed += 1
continue
if failed == self.retry:
reply = "[bot]err:请求超时"
processing.remove("person_{}".format(event.sender.id))
if reply != '':
return await self.bot.send(event, reply)
async def on_group_message(self, event: GroupMessage):
global processing
if "group_{}".format(event.group.id) in processing:
return await self.bot.send(event, "err:正在处理中,请稍后再试")
reply = ''
if Image in event.message_chain:
pass
elif At(self.bot.qq) not in event.message_chain:
pass
else:
event.message_chain.remove(At(self.bot.qq))
processing.append("group_{}".format(event.sender.id))
# 超时则重试,重试超过次数则放弃
failed = 0
for i in range(self.retry):
try:
reply = self.process_message('group', event.group.id, str(event.message_chain).strip())
break
except FunctionTimedOut:
failed += 1
continue
if failed == self.retry:
reply = "err:请求超时"
processing.remove("group_{}".format(event.sender.id))
if reply != '':
return await self.bot.send(event, reply)
def get_inst() -> QQBotManager:
global inst
return inst