mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-07 14:26:03 +00:00
* Optimize the plugin system * feat: enhance plugin installation process and improve task management * fix: linter err * feat: add Matrix adapter with multi-bridge support - MatrixAdapter with text/image/file message support - Multi-bridge architecture (BridgeState) for Discord, Telegram, etc. - Auto-login, QR forwarding, disconnect detection - Force logout+login on adapter start - Group/private chat detection excluding bridge bots - matrix-nio dependency added * docs: sync platform tables across all READMEs with Matrix bridge support - Add Matrix/Satori compatibility notes to all platforms - Add 21 Matrix-only platforms (Signal, WhatsApp, Messenger, etc.) - Keep international market ordering (Discord first) for non-CN READMEs * Update API base URL to localhost * fix: remove unused datetime import (ruff) * style: ruff format matrix.py * docs: collapse matrix platform list * docs: simplify platform compatibility notes --------- Co-authored-by: Junyan Qin <rockchinq@gmail.com>
694 lines
30 KiB
Python
694 lines
30 KiB
Python
from __future__ import annotations
|
||
|
||
import typing
|
||
import asyncio
|
||
import traceback
|
||
import base64
|
||
import json
|
||
|
||
import nio
|
||
|
||
from langbot.pkg.utils import httpclient
|
||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
|
||
|
||
|
||
class MatrixMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||
@staticmethod
|
||
async def yiri2target(message_chain: platform_message.MessageChain, client: nio.AsyncClient) -> list[dict]:
|
||
components = []
|
||
for component in message_chain:
|
||
if isinstance(component, platform_message.Plain):
|
||
components.append({'type': 'text', 'text': component.text})
|
||
elif isinstance(component, platform_message.Image):
|
||
image_bytes = None
|
||
if component.base64:
|
||
b64_data = component.base64
|
||
if ';base64,' in b64_data:
|
||
b64_data = b64_data.split(';base64,', 1)[1]
|
||
image_bytes = base64.b64decode(b64_data)
|
||
elif component.url:
|
||
session = httpclient.get_session()
|
||
async with session.get(component.url) as response:
|
||
image_bytes = await response.read()
|
||
elif component.path:
|
||
with open(component.path, 'rb') as f:
|
||
image_bytes = f.read()
|
||
if image_bytes:
|
||
resp = await client.upload(image_bytes, content_type='image/png')
|
||
if isinstance(resp, nio.UploadResponse):
|
||
components.append({'type': 'image', 'mxc_url': resp.content_uri})
|
||
elif isinstance(component, platform_message.File):
|
||
file_bytes = None
|
||
if component.base64:
|
||
b64_data = component.base64
|
||
if ';base64,' in b64_data:
|
||
b64_data = b64_data.split(';base64,', 1)[1]
|
||
file_bytes = base64.b64decode(b64_data)
|
||
elif component.url:
|
||
session = httpclient.get_session()
|
||
async with session.get(component.url) as response:
|
||
file_bytes = await response.read()
|
||
elif component.path:
|
||
with open(component.path, 'rb') as f:
|
||
file_bytes = f.read()
|
||
if file_bytes:
|
||
file_name = getattr(component, 'name', None) or 'file'
|
||
resp = await client.upload(file_bytes, content_type='application/octet-stream', filename=file_name)
|
||
if isinstance(resp, nio.UploadResponse):
|
||
components.append(
|
||
{
|
||
'type': 'file',
|
||
'mxc_url': resp.content_uri,
|
||
'filename': file_name,
|
||
'size': len(file_bytes),
|
||
}
|
||
)
|
||
elif isinstance(component, platform_message.Forward):
|
||
for node in component.node_list:
|
||
components.extend(await MatrixMessageConverter.yiri2target(node.message_chain, client))
|
||
return components
|
||
|
||
@staticmethod
|
||
async def target2yiri(event: nio.RoomMessageText | nio.RoomMessageImage, client: nio.AsyncClient, bot_user_id: str):
|
||
message_components = []
|
||
|
||
if isinstance(event, nio.RoomMessageText):
|
||
text = event.body
|
||
if bot_user_id and bot_user_id in text:
|
||
message_components.append(platform_message.At(target=bot_user_id))
|
||
text = text.replace(bot_user_id, '').strip()
|
||
message_components.append(platform_message.Plain(text=text))
|
||
|
||
elif isinstance(event, nio.RoomMessageImage):
|
||
mxc_url = event.url
|
||
if mxc_url:
|
||
resp = await client.download(mxc_url)
|
||
if isinstance(resp, nio.DownloadResponse):
|
||
b64 = base64.b64encode(resp.body).decode('utf-8')
|
||
content_type = resp.content_type or 'image/png'
|
||
message_components.append(platform_message.Image(base64=f'data:{content_type};base64,{b64}'))
|
||
if event.body:
|
||
message_components.append(platform_message.Plain(text=event.body))
|
||
|
||
return platform_message.MessageChain(message_components)
|
||
|
||
|
||
class MatrixEventConverter(abstract_platform_adapter.AbstractEventConverter):
|
||
@staticmethod
|
||
async def yiri2target(event: platform_events.MessageEvent):
|
||
return event.source_platform_object
|
||
|
||
@staticmethod
|
||
async def target2yiri(
|
||
event: nio.RoomMessageText | nio.RoomMessageImage,
|
||
room: nio.MatrixRoom,
|
||
client: nio.AsyncClient,
|
||
bot_user_id: str,
|
||
bridge_user_ids: list[str] | None = None,
|
||
):
|
||
lb_message = await MatrixMessageConverter.target2yiri(event, client, bot_user_id)
|
||
|
||
# Determine if this is a direct/private chat or a group chat.
|
||
# Exclude bot itself and bridge bots, count remaining real users.
|
||
exclude_ids = {bot_user_id}
|
||
if bridge_user_ids:
|
||
exclude_ids.update(bridge_user_ids)
|
||
real_users = [uid for uid in room.users if uid not in exclude_ids]
|
||
is_direct = len(real_users) <= 1
|
||
|
||
if is_direct:
|
||
return platform_events.FriendMessage(
|
||
sender=platform_entities.Friend(
|
||
id=event.sender,
|
||
nickname=room.user_name(event.sender) or event.sender,
|
||
remark='',
|
||
),
|
||
message_chain=lb_message,
|
||
time=event.server_timestamp / 1000.0,
|
||
source_platform_object={'event': event, 'room': room},
|
||
)
|
||
else:
|
||
return platform_events.GroupMessage(
|
||
sender=platform_entities.GroupMember(
|
||
id=event.sender,
|
||
member_name=room.user_name(event.sender) or event.sender,
|
||
permission=platform_entities.Permission.Member,
|
||
group=platform_entities.Group(
|
||
id=room.room_id,
|
||
name=room.display_name or room.room_id,
|
||
permission=platform_entities.Permission.Member,
|
||
),
|
||
special_title='',
|
||
),
|
||
message_chain=lb_message,
|
||
time=event.server_timestamp / 1000.0,
|
||
source_platform_object={'event': event, 'room': room},
|
||
)
|
||
|
||
|
||
class BridgeState:
|
||
"""Per-bridge runtime state."""
|
||
|
||
def __init__(self, user_id: str, login_command: str, logout_command: str, success_keyword: str, check_command: str):
|
||
self.user_id = user_id
|
||
self.login_command = login_command
|
||
self.logout_command = logout_command
|
||
self.success_keyword = success_keyword
|
||
self.check_command = check_command or login_command
|
||
self.logged_in = False
|
||
self.dm_room_id: str | None = None
|
||
self.login_task: asyncio.Task | None = None
|
||
self.check_task: asyncio.Task | None = None
|
||
self.check_responded = False
|
||
|
||
|
||
class MatrixAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||
client: typing.Any = None
|
||
message_converter: MatrixMessageConverter = MatrixMessageConverter()
|
||
event_converter: MatrixEventConverter = MatrixEventConverter()
|
||
config: dict
|
||
listeners: typing.Dict[typing.Type[platform_events.Event], typing.Callable] = {}
|
||
_running: bool = False
|
||
_initial_sync_done: bool = False
|
||
_bridges: list = []
|
||
|
||
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger):
|
||
homeserver_url = config.get('homeserver_url', '')
|
||
access_token = config.get('access_token', '')
|
||
user_id = config.get('user_id', '')
|
||
|
||
if not homeserver_url or not access_token or not user_id:
|
||
raise ValueError('Matrix 机器人缺少必要配置项 (homeserver_url, user_id, access_token)')
|
||
|
||
client = nio.AsyncClient(homeserver_url, user_id)
|
||
client.access_token = access_token
|
||
client.user_id = user_id
|
||
|
||
super().__init__(
|
||
config=config,
|
||
logger=logger,
|
||
bot_account_id=user_id,
|
||
client=client,
|
||
listeners={},
|
||
)
|
||
|
||
# Parse bridges config AFTER super().__init__() to avoid Pydantic resetting _bridges
|
||
self._bridges = []
|
||
bridges_raw = config.get('bridges', '')
|
||
if bridges_raw:
|
||
if isinstance(bridges_raw, str):
|
||
try:
|
||
bridges_list = json.loads(bridges_raw)
|
||
except (json.JSONDecodeError, TypeError) as e:
|
||
raise ValueError(f'bridges 配置 JSON 解析失败: {e}\n原始值: {bridges_raw}')
|
||
else:
|
||
bridges_list = bridges_raw
|
||
for b in bridges_list:
|
||
if isinstance(b, dict) and b.get('user_id', '').strip():
|
||
self._bridges.append(
|
||
BridgeState(
|
||
user_id=b['user_id'].strip(),
|
||
login_command=b.get('login_command', '').strip(),
|
||
logout_command=b.get('logout_command', '').strip(),
|
||
success_keyword=b.get('success_keyword', 'Successfully logged in').strip(),
|
||
check_command=b.get('check_command', '').strip(),
|
||
)
|
||
)
|
||
# Backward compatibility: old single-bridge config
|
||
if not self._bridges:
|
||
old_user_id = config.get('bridge_user_id', '').strip()
|
||
old_command = config.get('bridge_login_command', '').strip()
|
||
old_keyword = config.get('bridge_login_success_keyword', 'Successfully logged in').strip()
|
||
old_check = config.get('bridge_check_command', '').strip()
|
||
old_logout = config.get('bridge_logout_command', '').strip()
|
||
if old_user_id:
|
||
self._bridges.append(
|
||
BridgeState(
|
||
user_id=old_user_id,
|
||
login_command=old_command,
|
||
logout_command=old_logout,
|
||
success_keyword=old_keyword,
|
||
check_command=old_check,
|
||
)
|
||
)
|
||
|
||
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
|
||
components = await self.message_converter.yiri2target(message, self.client)
|
||
for component in components:
|
||
await self._send_component(target_id, component)
|
||
|
||
async def reply_message(
|
||
self,
|
||
message_source: platform_events.MessageEvent,
|
||
message: platform_message.MessageChain,
|
||
quote_origin: bool = False,
|
||
):
|
||
source_obj = message_source.source_platform_object
|
||
room_id = source_obj['room'].room_id
|
||
components = await self.message_converter.yiri2target(message, self.client)
|
||
|
||
for component in components:
|
||
if quote_origin:
|
||
original_event = source_obj['event']
|
||
await self._send_component(room_id, component, reply_to=original_event.event_id)
|
||
else:
|
||
await self._send_component(room_id, component)
|
||
|
||
async def _send_component(self, room_id: str, component: dict, reply_to: str | None = None):
|
||
content = {}
|
||
if component['type'] == 'text':
|
||
content = {
|
||
'msgtype': 'm.text',
|
||
'body': component['text'],
|
||
}
|
||
elif component['type'] == 'image':
|
||
content = {
|
||
'msgtype': 'm.image',
|
||
'body': 'image.png',
|
||
'url': component['mxc_url'],
|
||
}
|
||
elif component['type'] == 'file':
|
||
content = {
|
||
'msgtype': 'm.file',
|
||
'body': component.get('filename', 'file'),
|
||
'url': component['mxc_url'],
|
||
'info': {'size': component.get('size', 0)},
|
||
}
|
||
|
||
if reply_to and content:
|
||
content['m.relates_to'] = {
|
||
'm.in_reply_to': {'event_id': reply_to},
|
||
}
|
||
|
||
if content:
|
||
await self.client.room_send(
|
||
room_id=room_id,
|
||
message_type='m.room.message',
|
||
content=content,
|
||
)
|
||
|
||
def register_listener(
|
||
self,
|
||
event_type: typing.Type[platform_events.Event],
|
||
callback: typing.Callable[
|
||
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
|
||
],
|
||
):
|
||
self.listeners[event_type] = callback
|
||
|
||
async def run_async(self):
|
||
self._running = True
|
||
await self.logger.info('Matrix adapter starting...')
|
||
|
||
# Debug: log bridge parsing result
|
||
bridges_raw = self.config.get('bridges', '')
|
||
await self.logger.debug(f'bridges config raw: type={type(bridges_raw).__name__}, repr={repr(bridges_raw)}')
|
||
await self.logger.debug(
|
||
f'parsed _bridges count: {len(self._bridges)}, ids: {[b.user_id for b in self._bridges]}'
|
||
)
|
||
|
||
# Collect all bridge bot user IDs for filtering
|
||
_bridge_user_ids = [b.user_id for b in self._bridges]
|
||
_bridge_user_id_set = set(_bridge_user_ids)
|
||
|
||
# Auto-join invited rooms
|
||
async def on_invite(room: nio.MatrixRoom, event: nio.InviteMemberEvent):
|
||
if event.membership == 'invite' and event.state_key == self.client.user_id:
|
||
await self.client.join(room.room_id)
|
||
await self.logger.debug(f'Auto-joined room: {room.display_name or room.room_id}')
|
||
|
||
self.client.add_event_callback(on_invite, nio.InviteMemberEvent)
|
||
|
||
# Handle text messages
|
||
async def on_message(room: nio.MatrixRoom, event: nio.RoomMessageText):
|
||
if not self._initial_sync_done:
|
||
return
|
||
if event.sender == self.client.user_id:
|
||
return
|
||
|
||
# Admin commands (from any non-bridge user)
|
||
if event.sender not in _bridge_user_id_set:
|
||
body = (event.body or '').strip()
|
||
if body == '!relogin':
|
||
await self._handle_relogin_command(room.room_id)
|
||
return
|
||
if body == '!status':
|
||
await self._handle_status_command(room.room_id)
|
||
return
|
||
|
||
if event.sender in _bridge_user_id_set:
|
||
return
|
||
try:
|
||
lb_event = await self.event_converter.target2yiri(
|
||
event, room, self.client, self.bot_account_id, _bridge_user_ids
|
||
)
|
||
if type(lb_event) in self.listeners:
|
||
result = self.listeners[type(lb_event)](lb_event, self)
|
||
if asyncio.iscoroutine(result):
|
||
await result
|
||
except Exception:
|
||
await self.logger.error(f'Error handling Matrix message: {traceback.format_exc()}')
|
||
|
||
self.client.add_event_callback(on_message, nio.RoomMessageText)
|
||
|
||
# Handle image messages
|
||
async def on_image(room: nio.MatrixRoom, event: nio.RoomMessageImage):
|
||
if not self._initial_sync_done:
|
||
return
|
||
if event.sender == self.client.user_id:
|
||
return
|
||
if event.sender in _bridge_user_id_set:
|
||
return
|
||
try:
|
||
lb_event = await self.event_converter.target2yiri(
|
||
event, room, self.client, self.bot_account_id, _bridge_user_ids
|
||
)
|
||
if type(lb_event) in self.listeners:
|
||
result = self.listeners[type(lb_event)](lb_event, self)
|
||
if asyncio.iscoroutine(result):
|
||
await result
|
||
except Exception:
|
||
await self.logger.error(f'Error handling Matrix image: {traceback.format_exc()}')
|
||
|
||
self.client.add_event_callback(on_image, nio.RoomMessageImage)
|
||
|
||
# Set up bridge-specific callbacks for each bridge
|
||
_disconnect_keywords = ['disconnected', 'logged out', 'connection lost', 'session expired', 'token expired']
|
||
|
||
for bridge in self._bridges:
|
||
# Login success detection (notice)
|
||
async def on_bridge_notice(room: nio.MatrixRoom, event: nio.RoomMessageNotice, _b=bridge):
|
||
if not self._initial_sync_done:
|
||
return
|
||
if event.sender != _b.user_id:
|
||
return
|
||
_b.check_responded = True
|
||
if _b.success_keyword in (event.body or ''):
|
||
_b.logged_in = True
|
||
await self.logger.info(f'[{_b.user_id}] Bridge login succeeded.')
|
||
# Disconnect detection
|
||
body_lower = (event.body or '').lower()
|
||
for kw in _disconnect_keywords:
|
||
if kw in body_lower and _b.logged_in:
|
||
_b.logged_in = False
|
||
await self.logger.info(f'[{_b.user_id}] Bridge 账号掉线 (检测到: "{kw}"), 将自动重新登录...')
|
||
self._restart_bridge_login(_b)
|
||
break
|
||
|
||
self.client.add_event_callback(on_bridge_notice, nio.RoomMessageNotice)
|
||
|
||
# Login success + disconnect detection (text)
|
||
async def on_bridge_text(room: nio.MatrixRoom, event: nio.RoomMessageText, _b=bridge):
|
||
if not self._initial_sync_done:
|
||
return
|
||
if event.sender != _b.user_id:
|
||
return
|
||
_b.check_responded = True
|
||
if _b.success_keyword in (event.body or ''):
|
||
_b.logged_in = True
|
||
await self.logger.info(f'[{_b.user_id}] Bridge login succeeded.')
|
||
body_lower = (event.body or '').lower()
|
||
for kw in _disconnect_keywords:
|
||
if kw in body_lower and _b.logged_in:
|
||
_b.logged_in = False
|
||
await self.logger.info(f'[{_b.user_id}] Bridge 账号掉线 (检测到: "{kw}"), 将自动重新登录...')
|
||
self._restart_bridge_login(_b)
|
||
break
|
||
|
||
self.client.add_event_callback(on_bridge_text, nio.RoomMessageText)
|
||
|
||
# QR code image forwarding
|
||
async def on_bridge_image(room: nio.MatrixRoom, event: nio.RoomMessageImage, _b=bridge):
|
||
if not self._initial_sync_done:
|
||
return
|
||
if event.sender != _b.user_id:
|
||
return
|
||
mxc_url = event.url
|
||
if not mxc_url:
|
||
return
|
||
try:
|
||
resp = await self.client.download(mxc_url)
|
||
if isinstance(resp, nio.DownloadResponse):
|
||
b64 = base64.b64encode(resp.body).decode('utf-8')
|
||
content_type = resp.content_type or 'image/png'
|
||
await self.logger.info(
|
||
f'[{_b.user_id}] Bridge 发送了二维码,请扫码登录:',
|
||
images=[platform_message.Image(base64=f'data:{content_type};base64,{b64}')],
|
||
)
|
||
except Exception:
|
||
await self.logger.error(
|
||
f'[{_b.user_id}] Failed to download bridge QR image: {traceback.format_exc()}'
|
||
)
|
||
|
||
self.client.add_event_callback(on_bridge_image, nio.RoomMessageImage)
|
||
|
||
await self.logger.debug('Matrix adapter running, starting sync...')
|
||
|
||
# Initial sync to skip old messages
|
||
resp = await self.client.sync(timeout=10000)
|
||
if isinstance(resp, nio.SyncResponse):
|
||
await self.logger.debug(f'Matrix initial sync done, next_batch: {resp.next_batch}')
|
||
self._initial_sync_done = True
|
||
|
||
# Display account info
|
||
display_name = self.client.user_id
|
||
try:
|
||
profile_resp = await self.client.get_displayname(self.client.user_id)
|
||
if isinstance(profile_resp, nio.ProfileGetDisplayNameResponse) and profile_resp.displayname:
|
||
display_name = profile_resp.displayname
|
||
except Exception:
|
||
pass
|
||
joined_rooms = len(self.client.rooms)
|
||
homeserver = self.config.get('homeserver_url', '')
|
||
bridge_info = ''
|
||
if self._bridges:
|
||
bridge_names = ', '.join(b.user_id for b in self._bridges)
|
||
bridge_info = f' | 桥接: [{bridge_names}]'
|
||
await self.logger.info(
|
||
f'Matrix 账号: {display_name} ({self.client.user_id}) | '
|
||
f'服务器: {homeserver} | 已加入 {joined_rooms} 个房间{bridge_info}'
|
||
)
|
||
|
||
# Start bridge login and status check tasks for each bridge
|
||
for bridge in self._bridges:
|
||
if bridge.login_command:
|
||
await self.logger.info(
|
||
f'[{bridge.user_id}] Bridge login enabled (命令: "{bridge.login_command}", '
|
||
f'关键词: "{bridge.success_keyword}")'
|
||
)
|
||
bridge.login_task = asyncio.create_task(self._periodic_bridge_login(bridge))
|
||
bridge.check_task = asyncio.create_task(self._periodic_bridge_check(bridge))
|
||
else:
|
||
await self.logger.debug(f'[{bridge.user_id}] Bridge login not configured (no login_command)')
|
||
|
||
# Main sync loop
|
||
while self._running:
|
||
try:
|
||
await self.client.sync(timeout=30000)
|
||
except Exception:
|
||
await self.logger.error(f'Matrix sync error: {traceback.format_exc()}')
|
||
await asyncio.sleep(5)
|
||
|
||
async def _periodic_bridge_login(self, bridge: BridgeState):
|
||
"""Periodically send login command to a bridge bot until login succeeds."""
|
||
try:
|
||
await self.logger.info(f'[{bridge.user_id}] Bridge login task started, looking for DM room...')
|
||
dm_room_id = None
|
||
for room_id, room in self.client.rooms.items():
|
||
if room.member_count == 2 and bridge.user_id in [m for m in room.users]:
|
||
dm_room_id = room_id
|
||
break
|
||
|
||
if not dm_room_id:
|
||
resp = await self.client.room_create(
|
||
is_direct=True,
|
||
invite=[bridge.user_id],
|
||
)
|
||
if isinstance(resp, nio.RoomCreateResponse):
|
||
dm_room_id = resp.room_id
|
||
await self.logger.debug(f'[{bridge.user_id}] Created DM room: {dm_room_id}')
|
||
else:
|
||
await self.logger.error(f'[{bridge.user_id}] Failed to create DM room: {resp}')
|
||
return
|
||
|
||
bridge.dm_room_id = dm_room_id
|
||
|
||
# Force logout first on every adapter start
|
||
logout_cmd = bridge.logout_command or bridge.login_command.replace('login', 'logout')
|
||
await self.logger.info(f'[{bridge.user_id}] 强制登出: "{logout_cmd}"')
|
||
await self.client.room_send(
|
||
room_id=dm_room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': logout_cmd},
|
||
)
|
||
await asyncio.sleep(3)
|
||
|
||
while self._running and not bridge.logged_in:
|
||
await self.logger.debug(f'[{bridge.user_id}] Sending "{bridge.login_command}" in room {dm_room_id}')
|
||
await self.client.room_send(
|
||
room_id=dm_room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': bridge.login_command},
|
||
)
|
||
for _ in range(60):
|
||
if not self._running or bridge.logged_in:
|
||
break
|
||
await asyncio.sleep(1)
|
||
|
||
if bridge.logged_in:
|
||
await self.logger.debug(f'[{bridge.user_id}] Bridge login confirmed, periodic login stopped.')
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception:
|
||
await self.logger.error(f'[{bridge.user_id}] Bridge periodic login error: {traceback.format_exc()}')
|
||
|
||
def _restart_bridge_login(self, bridge: BridgeState):
|
||
"""Cancel existing login task and start a new one."""
|
||
if bridge.login_task and not bridge.login_task.done():
|
||
bridge.login_task.cancel()
|
||
bridge.login_task = asyncio.create_task(self._periodic_bridge_login(bridge))
|
||
|
||
async def _periodic_bridge_check(self, bridge: BridgeState):
|
||
"""Periodically check a bridge's login status."""
|
||
try:
|
||
while self._running and not bridge.logged_in:
|
||
await asyncio.sleep(5)
|
||
|
||
check_interval = 300 # 5 minutes
|
||
response_timeout = 30
|
||
await self.logger.debug(f'[{bridge.user_id}] Bridge status check started (interval: {check_interval}s)')
|
||
|
||
while self._running:
|
||
for _ in range(check_interval):
|
||
if not self._running:
|
||
return
|
||
await asyncio.sleep(1)
|
||
|
||
if not bridge.logged_in or not bridge.dm_room_id:
|
||
continue
|
||
|
||
try:
|
||
bridge.check_responded = False
|
||
await self.client.room_send(
|
||
room_id=bridge.dm_room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': bridge.check_command},
|
||
)
|
||
await self.logger.debug(f'[{bridge.user_id}] Bridge status check: sent "{bridge.check_command}"')
|
||
|
||
for _ in range(response_timeout):
|
||
if bridge.check_responded or not self._running:
|
||
break
|
||
await asyncio.sleep(1)
|
||
|
||
if bridge.check_responded:
|
||
await self.logger.debug(f'[{bridge.user_id}] Bridge status check: OK')
|
||
else:
|
||
await self.logger.info(
|
||
f'[{bridge.user_id}] Bridge status check: 无响应, 可能已掉线, 尝试重新登录...'
|
||
)
|
||
bridge.logged_in = False
|
||
self._restart_bridge_login(bridge)
|
||
except Exception:
|
||
await self.logger.error(f'[{bridge.user_id}] Bridge status check error: {traceback.format_exc()}')
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception:
|
||
await self.logger.error(f'[{bridge.user_id}] Bridge status check fatal error: {traceback.format_exc()}')
|
||
|
||
async def _handle_relogin_command(self, room_id: str):
|
||
"""Handle !relogin command: logout then re-login all bridges."""
|
||
if not self._bridges:
|
||
await self.client.room_send(
|
||
room_id=room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': '没有配置任何桥。'},
|
||
)
|
||
return
|
||
|
||
lines = ['开始重新登录所有桥...']
|
||
for bridge in self._bridges:
|
||
if not bridge.login_command or not bridge.dm_room_id:
|
||
lines.append(f'[{bridge.user_id}] 跳过(未配置登录命令或无DM房间)')
|
||
continue
|
||
|
||
# Use configured logout command, fallback to deriving from login command
|
||
logout_cmd = bridge.logout_command or bridge.login_command.replace('login', 'logout')
|
||
lines.append(f'[{bridge.user_id}] 发送 "{logout_cmd}"...')
|
||
|
||
# Cancel existing tasks
|
||
if bridge.login_task and not bridge.login_task.done():
|
||
bridge.login_task.cancel()
|
||
if bridge.check_task and not bridge.check_task.done():
|
||
bridge.check_task.cancel()
|
||
|
||
# Send logout
|
||
try:
|
||
await self.client.room_send(
|
||
room_id=bridge.dm_room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': logout_cmd},
|
||
)
|
||
except Exception as e:
|
||
lines.append(f'[{bridge.user_id}] logout 发送失败: {e}')
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
# Reset state and restart login
|
||
bridge.logged_in = False
|
||
self._restart_bridge_login(bridge)
|
||
lines.append(f'[{bridge.user_id}] 已触发重新登录')
|
||
|
||
await self.client.room_send(
|
||
room_id=room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': '\n'.join(lines)},
|
||
)
|
||
|
||
async def _handle_status_command(self, room_id: str):
|
||
"""Handle !status command: show bridge states."""
|
||
if not self._bridges:
|
||
await self.client.room_send(
|
||
room_id=room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': '没有配置任何桥。'},
|
||
)
|
||
return
|
||
|
||
lines = ['桥状态:']
|
||
for bridge in self._bridges:
|
||
status = '已登录 ✓' if bridge.logged_in else '未登录 ✗'
|
||
dm = bridge.dm_room_id or '无'
|
||
lines.append(f'• {bridge.user_id}: {status} (DM: {dm})')
|
||
await self.client.room_send(
|
||
room_id=room_id,
|
||
message_type='m.room.message',
|
||
content={'msgtype': 'm.text', 'body': '\n'.join(lines)},
|
||
)
|
||
|
||
async def kill(self) -> bool:
|
||
self._running = False
|
||
for bridge in self._bridges:
|
||
if bridge.login_task and not bridge.login_task.done():
|
||
bridge.login_task.cancel()
|
||
if bridge.check_task and not bridge.check_task.done():
|
||
bridge.check_task.cancel()
|
||
if self.client:
|
||
await self.client.close()
|
||
await self.logger.debug('Matrix adapter stopped')
|
||
return True
|
||
|
||
async def unregister_listener(
|
||
self,
|
||
event_type: typing.Type[platform_events.Event],
|
||
callback: typing.Callable[
|
||
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
|
||
],
|
||
):
|
||
if event_type in self.listeners:
|
||
del self.listeners[event_type]
|