from __future__ import annotations
import typing
import time
import datetime
import json
import asyncio
import traceback
import re
import base64
import aiohttp
import pydantic
import websockets
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
class SatoriMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
"""Convert between LangBot MessageChain and Satori message format"""
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain, adapter: 'SatoriAdapter') -> str:
"""Convert LangBot MessageChain to Satori message format"""
content_parts = []
for component in message_chain:
if isinstance(component, platform_message.Plain):
text = component.text.replace('&', '&').replace('<', '<').replace('>', '>')
content_parts.append(text)
elif isinstance(component, platform_message.Image):
# Prefer URL over base64 to avoid buffer overflow issues with large images
if component.url:
content_parts.append(f'
')
elif hasattr(component, 'base64') and component.base64:
# Process base64 data
base64_data = component.base64
# Remove whitespace that might corrupt the data
base64_data = base64_data.replace('\n', '').replace('\r', '').replace(' ', '')
# Check size - if too large, try to upload
MAX_INLINE_SIZE = 32 * 1024 # 32KB limit for inline base64
# Extract raw base64 and mime type
raw_b64 = base64_data
mime_type = 'image/png'
if base64_data.startswith('data:'):
try:
header, raw_b64 = base64_data.split(',', 1)
if ';' in header:
mime_type = header.split(':')[1].split(';')[0]
except (ValueError, IndexError):
pass
if len(raw_b64) > MAX_INLINE_SIZE:
# Try to upload large image
try:
# Fix base64 padding if needed
padding = 4 - len(raw_b64) % 4
if padding != 4:
raw_b64 += '=' * padding
image_bytes = base64.b64decode(raw_b64)
uploaded_url = await adapter.upload_image(image_bytes, mime_type)
if uploaded_url:
await adapter.logger.info(f'Satori 图片上传成功: {len(image_bytes)} 字节')
content_parts.append(f'
')
else:
# Upload failed, use inline (may fail)
await adapter.logger.warning('Satori 图片上传失败,使用内联模式')
content_parts.append(f'
')
except Exception as e:
await adapter.logger.error(f'Satori 图片处理失败: {e}')
content_parts.append(f'
')
else:
# Small image, use inline
content_parts.append(f'
')
elif isinstance(component, platform_message.At):
if component.target:
content_parts.append(f'')
elif isinstance(component, platform_message.AtAll):
content_parts.append('')
elif isinstance(component, platform_message.Reply):
content_parts.append(f'')
elif isinstance(component, platform_message.Quote):
content_parts.append(f'
')
elif isinstance(component, platform_message.Face):
# Satori中的表情可以使用emoticon元素
face_id = getattr(component, 'face_id', 'unknown')
content_parts.append(f'')
elif isinstance(component, platform_message.Voice):
if hasattr(component, 'url') and component.url:
content_parts.append(f'')
elif isinstance(component, platform_message.File):
if hasattr(component, 'url') and component.url:
content_parts.append(f'')
return ''.join(content_parts)
@staticmethod
async def target2yiri(
message_data: dict, adapter: 'SatoriAdapter', bot_account_id: str = ''
) -> platform_message.MessageChain:
"""Convert Satori message to LangBot MessageChain
Parses Satori's XML-like message format and converts to LangBot MessageChain.
Handles text, images, mentions, replies, quotes, emoticons, audio, and files.
"""
content = message_data.get('content', '')
components = []
if content:
# HTML实体解码 - 注意顺序:先解码 & 再解码其他实体
# 这样可以正确处理 < -> < -> <
content = content.replace('&', '&').replace('<', '<').replace('>', '>')
# 定义各种消息组件的正则模式 - 支持更灵活的属性顺序
# 使用 (?:...) 非捕获组来支持可选属性
patterns = [
# 图片 - 支持 src 在任意位置
(r'
]*src=["\']([^"\']+)["\'][^>]*/?\s*>', 'image'),
# @提及用户 - id 属性
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'mention'),
# @全体 - type="all"
(r']*type=["\']all["\'][^>]*/?\s*>', 'mention_all'),
# 回复
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'reply'),
# 引用
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'quote'),
# 表情
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'emoticon'),
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'face'),
# 音频
(r'