Files
LangBot/pkg/platform/manager.py
2024-02-11 23:07:38 +08:00

194 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import os
import logging
import asyncio
import traceback
from mirai import At, GroupMessage, MessageEvent, StrangerMessage, \
FriendMessage, Image, MessageChain, Plain
import mirai
from ..platform import adapter as msadapter
from ..core import app, entities as core_entities
from ..plugin import events
# 控制QQ消息输入输出的类
class PlatformManager:
# adapter: msadapter.MessageSourceAdapter = None
adapters: list[msadapter.MessageSourceAdapter] = []
# modern
ap: app.Application = None
def __init__(self, ap: app.Application = None):
self.ap = ap
self.adapters = []
async def initialize(self):
from .sources import yirimirai, nakuru, aiocqhttp, qqbotpy
async def on_friend_message(event: FriendMessage, adapter: msadapter.MessageSourceAdapter):
event_ctx = await self.ap.plugin_mgr.emit_event(
event=events.PersonMessageReceived(
launcher_type='person',
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_chain=event.message_chain,
query=None
)
)
if not event_ctx.is_prevented_default():
await self.ap.query_pool.add_query(
launcher_type=core_entities.LauncherTypes.PERSON,
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter
)
async def on_stranger_message(event: StrangerMessage, adapter: msadapter.MessageSourceAdapter):
event_ctx = await self.ap.plugin_mgr.emit_event(
event=events.PersonMessageReceived(
launcher_type='person',
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_chain=event.message_chain,
query=None
)
)
if not event_ctx.is_prevented_default():
await self.ap.query_pool.add_query(
launcher_type=core_entities.LauncherTypes.PERSON,
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter
)
# nakuru不区分好友和陌生人故仅为yirimirai注册陌生人事件
# if self.ap.platform_cfg.data['platform-adapter'] == 'yiri-mirai':
# self.adapter.register_listener(
# StrangerMessage,
# on_stranger_message
# )
async def on_group_message(event: GroupMessage, adapter: msadapter.MessageSourceAdapter):
event_ctx = await self.ap.plugin_mgr.emit_event(
event=events.GroupMessageReceived(
launcher_type='person',
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_chain=event.message_chain,
query=None
)
)
if not event_ctx.is_prevented_default():
await self.ap.query_pool.add_query(
launcher_type=core_entities.LauncherTypes.GROUP,
launcher_id=event.group.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter
)
for adap_cfg in self.ap.platform_cfg.data['platform-adapters']:
if adap_cfg['enable']:
cfg_copy = adap_cfg.copy()
del cfg_copy['enable']
adapter_name = cfg_copy['adapter']
del cfg_copy['adapter']
found = False
for adapter in msadapter.preregistered_adapters:
if adapter.name == adapter_name:
found = True
adapter_cls = adapter
adapter_inst = adapter_cls(
cfg_copy,
self.ap
)
self.adapters.append(adapter_inst)
if adapter_name == 'yiri-mirai':
adapter_inst.register_listener(
StrangerMessage,
on_stranger_message
)
adapter_inst.register_listener(
FriendMessage,
on_friend_message
)
adapter_inst.register_listener(
GroupMessage,
on_group_message
)
if not found:
raise Exception('platform.json 中启用了未知的平台适配器: ' + adapter_name)
async def send(self, event, msg, adapter: msadapter.MessageSourceAdapter, check_quote=True, check_at_sender=True):
if check_at_sender and self.ap.platform_cfg.data['at-sender'] and isinstance(event, GroupMessage):
msg.insert(
0,
At(
event.sender.id
)
)
await adapter.reply_message(
event,
msg,
quote_origin=True if self.ap.platform_cfg.data['quote-origin'] and check_quote else False
)
# 通知系统管理员
# TODO delete
# async def notify_admin(self, message: str):
# await self.notify_admin_message_chain(MessageChain([Plain("[bot]{}".format(message))]))
# async def notify_admin_message_chain(self, message: mirai.MessageChain):
# if self.ap.system_cfg.data['admin-sessions'] != []:
# admin_list = []
# for admin in self.ap.system_cfg.data['admin-sessions']:
# admin_list.append(admin)
# for adm in admin_list:
# self.adapter.send_message(
# adm.split("_")[0],
# adm.split("_")[1],
# message
# )
async def run(self):
try:
tasks = []
for adapter in self.adapters:
tasks.append(adapter.run_async())
await asyncio.gather(*tasks)
except Exception as e:
self.ap.logger.error('平台适配器运行出错: ' + str(e))
self.ap.logger.debug(f"Traceback: {traceback.format_exc()}")