diff --git a/src/langbot/pkg/platform/sources/satori.py b/src/langbot/pkg/platform/sources/satori.py
index 32760bf9..096770cc 100644
--- a/src/langbot/pkg/platform/sources/satori.py
+++ b/src/langbot/pkg/platform/sources/satori.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-import typing
+import typing
import time
import datetime
import json
@@ -24,35 +24,43 @@ class SatoriMessageConverter(abstract_platform_adapter.AbstractMessageConverter)
"""Convert between LangBot MessageChain and Satori message format"""
@staticmethod
- async def yiri2target(message_chain: platform_message.MessageChain, adapter: 'SatoriAdapter') -> str:
+ async def yiri2target(
+ message_chain: platform_message.MessageChain, adapter: "SatoriAdapter"
+ ) -> str:
"""Convert LangBot MessageChain to Satori message format"""
content_parts = []
for component in message_chain:
if isinstance(component, platform_message.Plain):
- text = component.text.replace('&', '&').replace('<', '<').replace('>', '>')
+ text = (
+ component.text.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ )
content_parts.append(text)
elif isinstance(component, platform_message.Image):
# Prefer URL over base64 to avoid buffer overflow issues with large images
if component.url:
content_parts.append(f'
')
- elif hasattr(component, 'base64') and component.base64:
+ elif hasattr(component, "base64") and component.base64:
# Process base64 data
base64_data = component.base64
# Remove whitespace that might corrupt the data
- base64_data = base64_data.replace('\n', '').replace('\r', '').replace(' ', '')
+ base64_data = (
+ base64_data.replace("\n", "").replace("\r", "").replace(" ", "")
+ )
# Check size - if too large, try to upload
MAX_INLINE_SIZE = 32 * 1024 # 32KB limit for inline base64
# Extract raw base64 and mime type
raw_b64 = base64_data
- mime_type = 'image/png'
- if base64_data.startswith('data:'):
+ mime_type = "image/png"
+ if base64_data.startswith("data:"):
try:
- header, raw_b64 = base64_data.split(',', 1)
- if ';' in header:
- mime_type = header.split(':')[1].split(';')[0]
+ header, raw_b64 = base64_data.split(",", 1)
+ if ";" in header:
+ mime_type = header.split(":")[1].split(";")[0]
except (ValueError, IndexError):
pass
@@ -62,22 +70,34 @@ class SatoriMessageConverter(abstract_platform_adapter.AbstractMessageConverter)
# Fix base64 padding if needed
padding = 4 - len(raw_b64) % 4
if padding != 4:
- raw_b64 += '=' * padding
+ raw_b64 += "=" * padding
image_bytes = base64.b64decode(raw_b64)
- uploaded_url = await adapter.upload_image(image_bytes, mime_type)
+ uploaded_url = await adapter.upload_image(
+ image_bytes, mime_type
+ )
if uploaded_url:
- await adapter.logger.info(f'Satori 图片上传成功: {len(image_bytes)} 字节')
+ await adapter.logger.info(
+ f"Satori 图片上传成功: {len(image_bytes)} 字节"
+ )
content_parts.append(f'
')
else:
# Upload failed, use inline (may fail)
- await adapter.logger.warning('Satori 图片上传失败,使用内联模式')
- content_parts.append(f'
')
+ await adapter.logger.warning(
+ "Satori 图片上传失败,使用内联模式"
+ )
+ content_parts.append(
+ f'
'
+ )
except Exception as e:
- await adapter.logger.error(f'Satori 图片处理失败: {e}')
- content_parts.append(f'
')
+ await adapter.logger.error(f"Satori 图片处理失败: {e}")
+ content_parts.append(
+ f'
'
+ )
else:
# Small image, use inline
- content_parts.append(f'
')
+ content_parts.append(
+ f'
'
+ )
elif isinstance(component, platform_message.At):
if component.target:
content_parts.append(f'')
@@ -89,62 +109,66 @@ class SatoriMessageConverter(abstract_platform_adapter.AbstractMessageConverter)
content_parts.append(f'
')
elif isinstance(component, platform_message.Face):
# Satori中的表情可以使用emoticon元素
- face_id = getattr(component, 'face_id', 'unknown')
+ face_id = getattr(component, "face_id", "unknown")
content_parts.append(f'')
elif isinstance(component, platform_message.Voice):
- if hasattr(component, 'url') and component.url:
+ if hasattr(component, "url") and component.url:
content_parts.append(f'')
elif isinstance(component, platform_message.File):
- if hasattr(component, 'url') and component.url:
- content_parts.append(f'')
+ if hasattr(component, "url") and component.url:
+ content_parts.append(
+ f''
+ )
- return ''.join(content_parts)
+ return "".join(content_parts)
@staticmethod
async def target2yiri(
- message_data: dict, adapter: 'SatoriAdapter', bot_account_id: str = ''
+ message_data: dict, adapter: "SatoriAdapter", bot_account_id: str = ""
) -> platform_message.MessageChain:
"""Convert Satori message to LangBot MessageChain
Parses Satori's XML-like message format and converts to LangBot MessageChain.
Handles text, images, mentions, replies, quotes, emoticons, audio, and files.
"""
- content = message_data.get('content', '')
+ content = message_data.get("content", "")
components = []
if content:
# HTML实体解码 - 注意顺序:先解码 & 再解码其他实体
# 这样可以正确处理 < -> < -> <
- content = content.replace('&', '&').replace('<', '<').replace('>', '>')
+ content = (
+ content.replace("&", "&").replace("<", "<").replace(">", ">")
+ )
# 定义各种消息组件的正则模式 - 支持更灵活的属性顺序
# 使用 (?:...) 非捕获组来支持可选属性
patterns = [
# 图片 - 支持 src 在任意位置
- (r'
]*src=["\']([^"\']+)["\'][^>]*/?\s*>', 'image'),
+ (r'
]*src=["\']([^"\']+)["\'][^>]*/?\s*>', "image"),
# @提及用户 - id 属性
- (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'mention'),
+ (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "mention"),
# @全体 - type="all"
- (r']*type=["\']all["\'][^>]*/?\s*>', 'mention_all'),
+ (r']*type=["\']all["\'][^>]*/?\s*>', "mention_all"),
# 回复
- (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'reply'),
+ (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "reply"),
# 引用
- (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'quote'),
+ (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "quote"),
# 表情
- (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'emoticon'),
- (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', 'face'),
+ (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "emoticon"),
+ (r']*id=["\']([^"\']+)["\'][^>]*/?\s*>', "face"),
# 音频
- (r'