from __future__ import annotations
import typing
import time
import datetime
import json
import asyncio
import traceback
import re
import base64
import aiohttp
import pydantic
import websockets
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
class SatoriMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
"""Convert between LangBot MessageChain and Satori message format"""
@staticmethod
async def yiri2target(
message_chain: platform_message.MessageChain, adapter: "SatoriAdapter"
) -> str:
"""Convert LangBot MessageChain to Satori message format"""
content_parts = []
for component in message_chain:
if isinstance(component, platform_message.Plain):
text = (
component.text.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
)
content_parts.append(text)
elif isinstance(component, platform_message.Image):
# Prefer URL over base64 to avoid buffer overflow issues with large images
if component.url:
content_parts.append(f'
')
elif hasattr(component, "base64") and component.base64:
# Process base64 data
base64_data = component.base64
# Remove whitespace that might corrupt the data
base64_data = (
base64_data.replace("\n", "").replace("\r", "").replace(" ", "")
)
# Check size - if too large, try to upload
MAX_INLINE_SIZE = 32 * 1024 # 32KB limit for inline base64
# Extract raw base64 and mime type
raw_b64 = base64_data
mime_type = "image/png"
if base64_data.startswith("data:"):
try:
header, raw_b64 = base64_data.split(",", 1)
if ";" in header:
mime_type = header.split(":")[1].split(";")[0]
except (ValueError, IndexError):
pass
if len(raw_b64) > MAX_INLINE_SIZE:
# Try to upload large image
try:
# Fix base64 padding if needed
padding = 4 - len(raw_b64) % 4
if padding != 4:
raw_b64 += "=" * padding
image_bytes = base64.b64decode(raw_b64)
uploaded_url = await adapter.upload_image(
image_bytes, mime_type
)
if uploaded_url:
await adapter.logger.info(
f"Satori 图片上传成功: {len(image_bytes)} 字节"
)
content_parts.append(f'
')
else:
# Upload failed, use inline (may fail)
await adapter.logger.warning(
"Satori 图片上传失败,使用内联模式"
)
content_parts.append(
f'
'
)
except Exception as e:
await adapter.logger.error(f"Satori 图片处理失败: {e}")
content_parts.append(
f'
'
)
else:
# Small image, use inline
content_parts.append(
f'
'
)
elif isinstance(component, platform_message.At):
if component.target:
content_parts.append(f'')
elif isinstance(component, platform_message.AtAll):
content_parts.append('')
elif isinstance(component, platform_message.Reply):
content_parts.append(f'')
elif isinstance(component, platform_message.Quote):
content_parts.append(f'
')
elif isinstance(component, platform_message.Face):
# Satori中的表情可以使用emoticon元素
face_id = getattr(component, "face_id", "unknown")
content_parts.append(f'')
elif isinstance(component, platform_message.Voice):
if hasattr(component, "url") and component.url:
content_parts.append(f'')
elif isinstance(component, platform_message.File):
if hasattr(component, "url") and component.url:
content_parts.append(
f''
)
return "".join(content_parts)
@staticmethod
async def target2yiri(
message_data: dict, adapter: "SatoriAdapter", bot_account_id: str = ""
) -> platform_message.MessageChain:
"""Convert Satori message to LangBot MessageChain
Parses Satori's XML-like message format and converts to LangBot MessageChain.
Handles text, images, mentions, replies, quotes, emoticons, audio, and files.
"""
content = message_data.get("content", "")
components = []
if content:
# HTML实体解码 - 注意顺序:先解码 & 再解码其他实体
# 这样可以正确处理 < -> < -> <
content = (
content.replace("&", "&").replace("<", "<").replace(">", ">")
)
# 定义各种消息组件的正则模式 - 支持更灵活的属性顺序
# 使用 (?:...) 非捕获组来支持可选属性
patterns = [
# 图片 - 支持 src 在任意位置
(r'
]*src=["\']([^"\']+)["\'][^>]*/?\s*>', "image"),
# @提及用户 - id 属性
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "mention"),
# @全体 - type="all"
(r']*type=["\']all["\'][^>]*/?\s*>', "mention_all"),
# 回复
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "reply"),
# 引用
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "quote"),
# 表情
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "emoticon"),
(r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "face"),
# 音频
(r'