mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-11 08:16:03 +00:00
238 lines
8.6 KiB
Python
238 lines
8.6 KiB
Python
import mirai
|
|
|
|
from ..adapter import MessageSourceAdapter, MessageConverter, EventConverter
|
|
import nakuru
|
|
import nakuru.entities.components as nkc
|
|
|
|
import asyncio
|
|
import typing
|
|
import traceback
|
|
import logging
|
|
import json
|
|
|
|
|
|
class NakuruProjectMessageConverter(MessageConverter):
|
|
@staticmethod
|
|
def yiri2target(message_chain: mirai.MessageChain) -> list:
|
|
msg_list = []
|
|
if type(message_chain) is mirai.MessageChain:
|
|
msg_list = message_chain.__root__
|
|
elif type(message_chain) is list:
|
|
msg_list = message_chain
|
|
else:
|
|
raise Exception("Unknown message type: " + str(message_chain) + type(message_chain))
|
|
|
|
nakuru_msg_list = []
|
|
|
|
# 遍历并转换
|
|
for component in msg_list:
|
|
if type(component) is mirai.Plain:
|
|
nakuru_msg_list.append(nkc.Plain(component.text))
|
|
elif type(component) is mirai.Image:
|
|
if component.url is not None:
|
|
nakuru_msg_list.append(nkc.Image.fromURL(component.url))
|
|
elif component.base64 is not None:
|
|
nakuru_msg_list.append(nkc.Image.fromBase64(component.base64))
|
|
elif component.path is not None:
|
|
nakuru_msg_list.append(nkc.Image.fromFileSystem(component.path))
|
|
elif type(component) is mirai.Face:
|
|
nakuru_msg_list.append(nkc.Face(id=component.face_id))
|
|
elif type(component) is mirai.At:
|
|
nakuru_msg_list.append(nkc.At(qq=component.target))
|
|
elif type(component) is mirai.AtAll:
|
|
nakuru_msg_list.append(nkc.AtAll())
|
|
elif type(component) is mirai.Voice:
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
return nakuru_msg_list
|
|
|
|
@staticmethod
|
|
def target2yiri(message_chain: typing.Any) -> mirai.MessageChain:
|
|
assert type(message_chain) is list
|
|
|
|
yiri_msg_list = []
|
|
for component in message_chain:
|
|
if type(component) is nkc.Plain:
|
|
yiri_msg_list.append(mirai.Plain(text=component.text))
|
|
elif type(component) is nkc.Image:
|
|
yiri_msg_list.append(mirai.Image(url=component.url))
|
|
elif type(component) is nkc.Face:
|
|
yiri_msg_list.append(mirai.Face(face_id=component.id))
|
|
elif type(component) is nkc.At:
|
|
yiri_msg_list.append(mirai.At(target=component.qq))
|
|
elif type(component) is nkc.AtAll:
|
|
yiri_msg_list.append(mirai.AtAll())
|
|
else:
|
|
pass
|
|
logging.debug("转换后的消息链: " + str(yiri_msg_list))
|
|
return mirai.MessageChain(yiri_msg_list)
|
|
|
|
|
|
class NakuruProjectEventConverter(EventConverter):
|
|
@staticmethod
|
|
def yiri2target(event: typing.Type[mirai.Event]):
|
|
if event is mirai.GroupMessage:
|
|
return nakuru.GroupMessage
|
|
elif event is mirai.FriendMessage:
|
|
return nakuru.FriendMessage
|
|
else:
|
|
raise Exception("未支持转换的事件类型: " + str(event))
|
|
|
|
@staticmethod
|
|
def target2yiri(event: typing.Any) -> mirai.Event:
|
|
if type(event) is nakuru.FriendMessage:
|
|
return mirai.FriendMessage(
|
|
sender=mirai.models.entities.Friend(
|
|
id=event.sender.user_id,
|
|
nickname=event.sender.nickname,
|
|
remark=event.sender.nickname
|
|
),
|
|
message_chain=NakuruProjectMessageConverter.target2yiri(event.message),
|
|
time=event.time
|
|
)
|
|
elif type(event) is nakuru.GroupMessage:
|
|
permission = "MEMBER"
|
|
|
|
if event.sender.role == "admin":
|
|
permission = "ADMINISTRATOR"
|
|
elif event.sender.role == "owner":
|
|
permission = "OWNER"
|
|
|
|
import mirai.models.entities as entities
|
|
return mirai.GroupMessage(
|
|
sender=mirai.models.entities.GroupMember(
|
|
id=event.sender.user_id,
|
|
member_name=event.sender.nickname,
|
|
permission=permission,
|
|
group=mirai.models.entities.Group(
|
|
id=event.group_id,
|
|
name=event.sender.nickname,
|
|
permission=entities.Permission.Member
|
|
),
|
|
special_title=event.sender.title,
|
|
join_timestamp=0,
|
|
last_speak_timestamp=0,
|
|
mute_time_remaining=0,
|
|
),
|
|
message_chain=NakuruProjectMessageConverter.target2yiri(event.message),
|
|
time=event.time
|
|
)
|
|
else:
|
|
raise Exception("未支持转换的事件类型: " + str(event))
|
|
|
|
|
|
|
|
class NakuruProjectAdapter(MessageSourceAdapter):
|
|
"""nakuru-project适配器"""
|
|
bot: nakuru.CQHTTP
|
|
|
|
message_converter: NakuruProjectMessageConverter = NakuruProjectMessageConverter()
|
|
event_converter: NakuruProjectEventConverter = NakuruProjectEventConverter()
|
|
|
|
listener_list: list[dict]
|
|
|
|
def __init__(self, config: dict):
|
|
"""初始化nakuru-project的对象"""
|
|
self.bot = nakuru.CQHTTP(**config)
|
|
self.listener_list = []
|
|
|
|
def send_message(
|
|
self,
|
|
target_type: str,
|
|
target_id: str,
|
|
message: mirai.MessageChain
|
|
):
|
|
task = None
|
|
if target_type == "group":
|
|
task = self.bot.sendGroupMessage(int(target_id), self.message_converter.yiri2target(message))
|
|
elif target_type == "person":
|
|
task = self.bot.sendFriendMessage(int(target_id), self.message_converter.yiri2target(message))
|
|
else:
|
|
raise Exception("Unknown target type: " + target_type)
|
|
|
|
asyncio.run(task)
|
|
|
|
def reply_message(
|
|
self,
|
|
message_source: mirai.MessageEvent,
|
|
message: mirai.MessageChain,
|
|
quote_origin: bool = False
|
|
):
|
|
if type(message_source) is mirai.GroupMessage:
|
|
task = self.bot.sendGroupMessage(
|
|
message_source.sender.group.id,
|
|
self.message_converter.yiri2target(message),
|
|
# quote=message_source.message_id if quote_origin else None
|
|
)
|
|
elif type(message_source) is mirai.FriendMessage:
|
|
task = self.bot.sendFriendMessage(
|
|
message_source.sender.id,
|
|
self.message_converter.yiri2target(message),
|
|
# quote=message_source.message_id if quote_origin else None
|
|
)
|
|
else:
|
|
raise Exception("Unknown message source type: " + str(type(message_source)))
|
|
|
|
asyncio.run(task)
|
|
|
|
def is_muted(self, group_id: int) -> bool:
|
|
return False
|
|
|
|
def register_listener(
|
|
self,
|
|
event_type: typing.Type[mirai.Event],
|
|
callback: typing.Callable[[mirai.Event], None]
|
|
):
|
|
try:
|
|
logging.debug("注册监听器: " + str(event_type) + " -> " + str(callback))
|
|
async def listener_wrapper(app: nakuru.CQHTTP, source: self.event_converter.yiri2target(event_type)):
|
|
callback(self.event_converter.target2yiri(source))
|
|
self.listener_list.append(
|
|
{
|
|
"event_type": event_type,
|
|
"callable": callback,
|
|
"wrapper": listener_wrapper,
|
|
}
|
|
)
|
|
self.bot.receiver(self.event_converter.yiri2target(event_type).__name__)(listener_wrapper)
|
|
logging.debug("注册完成")
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
raise e
|
|
|
|
def unregister_listener(
|
|
self,
|
|
event_type: typing.Type[mirai.Event],
|
|
callback: typing.Callable[[mirai.Event], None]
|
|
):
|
|
nakuru_event_name = self.event_converter.yiri2target(event_type)
|
|
|
|
new_event_list = []
|
|
|
|
# 从本对象的监听器列表中查找并删除
|
|
target_wrapper = None
|
|
for listener in self.listener_list:
|
|
if listener["event_type"] == event_type and listener["callable"] == callback:
|
|
target_wrapper = listener["wrapper"]
|
|
self.listener_list.remove(listener)
|
|
break
|
|
|
|
if target_wrapper is None:
|
|
raise Exception("未找到对应的监听器")
|
|
|
|
for func in self.bot.event[nakuru_event_name]:
|
|
if func.callable != target_wrapper:
|
|
new_event_list.append(func)
|
|
|
|
self.bot.event[nakuru_event_name] = new_event_list
|
|
|
|
def run_sync(self):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
self.bot.run()
|
|
|
|
def kill(self) -> bool:
|
|
return False
|