feat(gewechat): 重构target2yiri代码+引用消息展开 (#1352)

* feat(gewechat): 重构target2yiri代码+引用消息展开

* feat(gewe): 引用消息,图片视频音频是单独的类型
This commit is contained in:
shinelin
2025-04-29 13:18:19 +08:00
committed by GitHub
parent 96183eb3e0
commit 3554702054

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
import gewechat_client
import typing
@@ -27,6 +25,7 @@ from ..types import entities as platform_entities
from ...utils import image
import xml.etree.ElementTree as ET
from typing import Optional, List, Tuple
from functools import partial
class GewechatMessageConverter(adapter.MessageConverter):
@@ -75,210 +74,332 @@ class GewechatMessageConverter(adapter.MessageConverter):
return content_list
async def target2yiri(
self,
message: dict,
bot_account_id: str
) -> platform_message.MessageChain:
# 预处理
content_list = []
ats_bot = False
raw_content = message["Data"]["Content"]["string"]
is_group_message = self.__is_group_message(message)
"""外部消息转平台消息"""
# 数据预处理
message_list = []
ats_bot = False # 是否被@
content = message["Data"]["Content"]["string"]
content_no_preifx = content # 群消息则去掉前缀
is_group_message = self._is_group_message(message)
if is_group_message:
ats_bot = self.__ats_bot(message, bot_account_id)
# 优先处理艾特全体成员,
if "@所有人" in raw_content: ## at全员时候传入atll不当作at自己
content_list.append(platform_message.AtAll())
ats_bot = self._ats_bot(message, bot_account_id)
if "@所有人" in content:
message_list.append(platform_message.AtAll())
elif ats_bot:
content_list.append(platform_message.At(target=bot_account_id))
raw_content, sender_id = self.__extract_content_and_sender(raw_content)
message_list.append(platform_message.At(target=bot_account_id))
content_no_preifx, _ = self._extract_content_and_sender(content)
# 消息类型
msg_type = message["Data"]["MsgType"]
# 映射消息类型到处理器方法
handler_map = {
1: self._handler_text,
3: self._handler_image,
34: self._handler_voice,
49: self._handler_compound, # 复合类型
}
# 分派处理
handler = handler_map.get(msg_type, self._handler_default)
handler_result = await handler(
message = message, # 原始的message
content_no_preifx = content_no_preifx, # 处理后的content
)
# 文本消息
if msg_type == 1:
# 文本清洗,仅替换群文本中的@文本[空格],的文本
if is_group_message and ats_bot:
pattern = r'@\S+'
raw_content = re.sub(pattern, '',raw_content)
content_list.append(platform_message.Plain(raw_content))
return platform_message.MessageChain(content_list)
if handler_result and len(handler_result) > 0:
message_list.extend(handler_result)
return platform_message.MessageChain(message_list)
# 图像
elif msg_type == 3:
image_xml = raw_content # 已经去除群聊消息前缀
async def _handler_text(
self,
message: Optional[dict],
content_no_preifx: str
) -> platform_message.MessageChain:
"""处理文本消息 (msg_type=1)"""
if message and self._is_group_message(message):
pattern = r'@\S+'
content_no_preifx = re.sub(pattern, '', content_no_preifx)
return platform_message.MessageChain([platform_message.Plain(content_no_preifx)])
async def _handler_image(
self,
message: Optional[dict],
content_no_preifx: str
) -> platform_message.MessageChain:
"""处理图像消息 (msg_type=3)"""
try:
image_xml = content_no_preifx
if not image_xml:
content_list.append(platform_message.Plain(text="[图片内容为空]"))
return platform_message.MessageChain(content_list)
try:
base64_str, image_format = await image.get_gewechat_image_base64(
gewechat_url=self.config["gewechat_url"],
gewechat_file_url=self.config["gewechat_file_url"],
app_id=self.config["app_id"],
xml_content=image_xml,
token=self.config["token"],
image_type=2,
)
return platform_message.MessageChain([platform_message.Unknown("[图片内容为空]")])
base64_str, image_format = await image.get_gewechat_image_base64(
gewechat_url=self.config["gewechat_url"],
gewechat_file_url=self.config["gewechat_file_url"],
app_id=self.config["app_id"],
xml_content=image_xml,
token=self.config["token"],
image_type=2,
)
content_list.append(platform_message.Image(
base64=f"data:image/{image_format};base64,{base64_str}"
))
# 消息链中加一个WeChatForwardImage的xml用于转发
content_list.append(platform_message.WeChatForwardImage(
xml_data = image_xml
))
return platform_message.MessageChain(content_list)
except Exception as e:
print(f"处理图片消息失败: {str(e)}")
content_list.append(platform_message.Plain(text=f"[图片处理失败]"))
return platform_message.MessageChain(content_list)
# 语音消息
elif msg_type == 34:
try:
audio_base64 = message["Data"]["ImgBuf"]["buffer"]
return platform_message.MessageChain(
[platform_message.Voice(base64=f"data:audio/silk;base64,{audio_base64}")]
)
except Exception as e:
return platform_message.MessageChain(
[platform_message.Plain(text="[无法解析群聊语音的消息]")] # 小测了一下免费版拿不到群聊语音消息的base64或者用什么办法解析xml里的url?
)
finally:
return platform_message.MessageChain(content_list)
elif msg_type == 49:
# 支持微信聊天记录的消息类型,将 XML 内容转换为 MessageChain 传递
try:
# 下方是移除<?xml,
xml_data = raw_content
if raw_content.startswith('<?xml'):
xml_list = raw_content.split('\n')[1:]
xml_data = '\n'.join(xml_list)
content_data = ET.fromstring(xml_data)
# raw_content已经不会是wxid开头了
#print(xml_data)
# 拿到细分消息类型按照gewe接口中描述
'''
小程序33/36
引用消息57
转账消息2000
红包消息2001
视频号消息51
文件发送完成: 6
'''
appmsg_data = content_data.find('.//appmsg')
data_type = appmsg_data.findtext('.//type')
if data_type == '57':
user_data = appmsg_data.findtext('.//title') or "" # 用户消息
quote_data = appmsg_data.find('.//refermsg').findtext('.//content') # 引用原文
sender_id = content_data.findtext('.//fromusername') # 发送方:单聊用户/群member
tousername = message['Wxid'] # 接收方: 所属微信的wxid
quote_id = appmsg_data.find('.//refermsg').findtext('.//chatusr') # 引用消息的原发送者
elements = [
platform_message.Image(base64=f"data:image/{image_format};base64,{base64_str}"),
platform_message.WeChatForwardImage(xml_data=image_xml) # 微信消息转发
]
return platform_message.MessageChain(elements)
except Exception as e:
print(f"处理图片失败: {str(e)}")
return platform_message.MessageChain([platform_message.Unknown("[图片处理失败]")])
async def _handler_voice(
self,
message: Optional[dict],
content_no_preifx: str
) -> platform_message.MessageChain:
"""处理语音消息 (msg_type=34)"""
message_List = []
try:
# 从消息中提取语音数据(需根据实际数据结构调整字段名)
audio_base64 = message["Data"]["ImgBuf"]["buffer"]
# 验证语音数据有效性
if not audio_base64:
message_List.append(platform_message.Unknown(text="[语音内容为空]"))
return platform_message.MessageChain(message_List)
# 转换为平台支持的语音格式(如 Silk 格式)
voice_element = platform_message.Voice(
base64=f"data:audio/silk;base64,{audio_base64}"
)
message_List.append(voice_element)
except KeyError as e:
print(f"语音数据字段缺失: {str(e)}")
message_List.append(platform_message.Unknown(text="[语音数据解析失败]"))
except Exception as e:
print(f"处理语音消息异常: {str(e)}")
message_List.append(platform_message.Unknown(text="[语音处理失败]"))
return platform_message.MessageChain(message_List)
# 群特殊处理引用消息的原发送者是bot or @bot
# 引用判断:quote_id == tousername
if is_group_message and (quote_id == tousername):
if not platform_message.MessageChain(content_list).has(platform_message.At):
content_list.append(platform_message.At(target=bot_account_id))
content_list.append(platform_message.Quote(
sender_id=sender_id,
origin=platform_message.MessageChain(
# 这里是文本或者xml, 历史原因用了plain
# TODO: 后面需要重构一下,根据type解析具体的消息类型
[platform_message.Plain(quote_data)]
)))
content_list.append(platform_message.Plain(user_data)) # FIXME: 这里还有wxid
return platform_message.MessageChain(content_list)
elif data_type == '51':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[视频号消息]'),
platform_message.Unknown(text=raw_content)]
)
# print(content_data)
elif data_type == '2000':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[转账消息]'),
platform_message.Unknown(text=raw_content)]
)
elif data_type == '2001':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[红包消息]'),
platform_message.Unknown(text=raw_content)]
)
elif data_type == '5':
content_list.append(
# platform_message.Plain(text=f'[公众号消息]'),
platform_message.WeChatForwardLink(xml_data=raw_content)
)
return platform_message.MessageChain(content_list)
elif data_type == '33' or data_type == '36':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[小程序消息]'),
platform_message.Unknown(text=raw_content)]
)
elif data_type == "6":
# 文件消息
content_list.append(
# platform_message.Plain(text=f'[文件消息]'),
platform_message.WeChatForwardFile(xml_data=raw_content)
)
return platform_message.MessageChain(content_list)
# print(data_type.text)
async def _handler_compound(
self,
message: Optional[dict],
content_no_preifx: str
) -> platform_message.MessageChain:
"""处理复合消息 (msg_type=49),根据子类型分派"""
try:
xml_data = ET.fromstring(content_no_preifx)
appmsg_data = xml_data.find('.//appmsg')
data_type = appmsg_data.findtext('.//type', "")
# 二次分派处理器
sub_handler_map = {
'57': self._handler_compound_quote,
'5': self._handler_compound_link,
'6': self._handler_compound_file,
'33': self._handler_compound_mini_program,
'36': self._handler_compound_mini_program,
'2000': partial(self._handler_compound_unsupported, text="[转账消息]"),
'2001': partial(self._handler_compound_unsupported, text="[红包消息]"),
'51': partial(self._handler_compound_unsupported, text="[视频号消息]"),
}
handler = sub_handler_map.get(data_type, self._handler_compound_unsupported)
return await handler(
message=message, #原始msg
xml_data=xml_data, # xml数据
)
except Exception as e:
print(f"解析复合消息失败: {str(e)}")
return platform_message.MessageChain([platform_message.Unknown(text=content_no_preifx)])
async def _handler_compound_quote(
self,
message: Optional[dict],
xml_data: ET.Element
) -> platform_message.MessageChain:
"""处理引用消息 (data_type=57)"""
# print("_handler_compound_quote", ET.tostring(xml_data, encoding='unicode'))
appmsg_data = xml_data.find('.//appmsg')
quote_data = "" # 引用原文
quote_id = None # 引用消息的原发送者
tousername = None # 接收方: 所属微信的wxid
user_data = "" # 用户消息
sender_id = xml_data.findtext('.//fromusername') # 发送方:单聊用户/群member
if appmsg_data:
user_data = appmsg_data.findtext('.//title') or ""
quote_data = appmsg_data.find('.//refermsg').findtext('.//content')
quote_id = appmsg_data.find('.//refermsg').findtext('.//chatusr')
if message:
tousername = message['Wxid']
message_list = []
# quote_data原始的消息
if quote_data:
quote_data_message_list = platform_message.MessageChain()
# 文本消息
try:
if "<msg>" not in quote_data:
quote_data_message_list.append(platform_message.Plain(quote_data))
else:
return platform_message.MessageChain(
[platform_message.Unknown(text=raw_content)]
)
# try:
# content_bytes = content.encode('utf-8')
# decoded_content = base64.b64decode(content_bytes)
# return platform_message.MessageChain(
# [platform_message.Unknown(content=decoded_content)]
# ) # unknown中没有content
# except Exception as e:
# return platform_message.MessageChain(
# [platform_message.Plain(text=content)]
# )
# 引用消息展开
quote_data_xml = ET.fromstring(quote_data)
if quote_data_xml.find("img"):
quote_data_message_list.extend(await self._handler_image(None, quote_data))
elif quote_data_xml.find("voicemsg"):
quote_data_message_list.extend(await self._handler_voice(None, quote_data))
elif quote_data_xml.find("videomsg"):
quote_data_message_list.extend(await self._handler_default(None, quote_data)) # 先不处理
else:
# appmsg
quote_data_message_list.extend(await self._handler_compound(None, quote_data))
except Exception as e:
print(f"Error processing type 49 message: {str(e)}")
return platform_message.MessageChain(
[ # platform_message.Plain(text="[无法解析的消息]"),
platform_message.Unknown(text=raw_content)]
print(f"处理引用消息异常 expcetion:{e}")
quote_data_message_list.append(platform_message.Plain(quote_data))
message_list.append(
platform_message.Quote(
sender_id=sender_id,
origin=quote_data_message_list,
)
finally:
return platform_message.MessageChain(content_list)
)
if len(user_data) > 0:
pattern = r'^@\S+'
user_data = re.sub(pattern, '', user_data)
message_list.append(platform_message.Plain(user_data))
# print(f"**_handler_compound_quote message_list len={len(message_list)}")
# for comp in message_list:
# if isinstance(comp, platform_message.Quote):
# print(f"**_handler_compound_quote send_id {comp.sender_id}" )
# for quote_item in comp.origin:
# print(f"******* _handler_compound_quote item {quote_item.type} + message: {quote_item}" )
# else:
# print(f"_handler_compound_quote type: {comp.type} + message: {comp}")
return platform_message.MessageChain(message_list)
async def _handler_compound_file(
self,
message: dict,
xml_data: ET.Element
) -> platform_message.MessageChain:
"""处理文件消息 (data_type=6)"""
xml_data_str = ET.tostring(xml_data, encoding='unicode')
return platform_message.MessageChain([
platform_message.WeChatForwardFile(xml_data=xml_data_str)
])
async def _handler_compound_link(
self,
message: dict,
xml_data: ET.Element
) -> platform_message.MessageChain:
"""处理链接消息(如公众号文章、外部网页)"""
message_list = []
try:
# 解析 XML 中的链接参数
appmsg = xml_data.find('.//appmsg')
if appmsg is None:
return platform_message.MessageChain()
message_list.append(
platform_message.WeChatLink(
link_title = appmsg.findtext('title', ''),
link_desc = appmsg.findtext('des', ''),
link_url = appmsg.findtext('url', ''),
link_thumb_url = appmsg.findtext("thumburl", '') # 这个字段拿不到
)
)
# 转发消息
xml_data_str = ET.tostring(xml_data, encoding='unicode')
# print(xml_data_str)
message_list.append(
platform_message.WeChatForwardLink(
xml_data=xml_data_str
)
)
except Exception as e:
print(f"解析链接消息失败: {str(e)}")
return platform_message.MessageChain(message_list)
async def _handler_compound_mini_program(
self,
message: dict,
xml_data: ET.Element
) -> platform_message.MessageChain:
"""处理小程序消息(如小程序卡片、服务通知)"""
xml_data_str = ET.tostring(xml_data, encoding='unicode')
return platform_message.MessageChain([
platform_message.WeChatForwardMiniPrograms(xml_data=xml_data_str)
])
async def _handler_default(
self,
message: Optional[dict],
content_no_preifx: str
) -> platform_message.MessageChain:
"""处理未知消息类型"""
if message:
msg_type = message["Data"]["MsgType"]
else:
content_list.append(platform_message.Unknown(text=f"[未知消息类型 msg_type:{msg_type}"))
return platform_message.MessageChain(content_list)
msg_type = ""
return platform_message.MessageChain([
platform_message.Unknown(text=f"[未知消息类型 msg_type:{msg_type}]")
])
def _handler_compound_unsupported(
self,
message: dict,
xml_data: str,
text: Optional[str] = None
) -> platform_message.MessageChain:
"""处理未支持复合消息类型(msg_type=49)子类型"""
if not text:
text = f"[xml_data={xml_data}]"
content_list = []
content_list.append(
platform_message.Unknown(text=f"[处理未支持复合消息类型[msg_type=49]|{text}"))
return platform_message.MessageChain(content_list)
# 返回是否被艾特
def __ats_bot(self, message: dict, bot_account_id:str) -> bool:
def _ats_bot(self, message: dict, bot_account_id:str) -> bool:
ats_bot = False
try:
to_user_name = message['Wxid'] # 接收方: 所属微信的wxid
raw_content = message["Data"]["Content"]["string"] # 原始消息内容
# step 1
ats_bot = ats_bot or (f"@{bot_account_id}" in raw_content)
# step 2
content_no_prefix, _ = self._extract_content_and_sender(raw_content)
# 直接艾特机器人
ats_bot = ats_bot or (f"@{bot_account_id}" in content_no_prefix)
# 文本类@bot
push_content = message.get('Data', {}).get('PushContent', '')
ats_bot = ats_bot or ('在群聊中@了你' in push_content)
# step 3
# 引用别人时@bot
msg_source = message.get('Data', {}).get('MsgSource', '') or ''
if len(msg_source) > 0:
msg_source_data = ET.fromstring(msg_source)
at_user_list = msg_source_data.findtext("atuserlist") or ""
ats_bot = ats_bot or (to_user_name in at_user_list)
# 引用bot
if message.get('Data', {}).get('MsgType', 0) == 49:
xml_data = ET.fromstring(content_no_prefix)
appmsg_data = xml_data.find('.//appmsg')
tousername = message['Wxid'] # 接收方: 所属微信的wxid
quote_id = appmsg_data.find('.//refermsg').findtext('.//chatusr') # 引用消息的原发送者
ats_bot = ats_bot or (quote_id == tousername)
except Exception as e:
print(f"__ats_bot got except: {e}")
print(f"_ats_bot got except: {e}")
finally:
return ats_bot
# 提取一下content前面的sender_id, 和去掉前缀的内容
def __extract_content_and_sender(self, raw_content: str) -> Tuple[str, Optional[str]]:
def _extract_content_and_sender(self, raw_content: str) -> Tuple[str, Optional[str]]:
try:
# 检查消息开头,如果有 wxid_sbitaz0mt65n22:\n 则删掉
# add: 有些用户的wxid不是上述格式。换成user_name:
@@ -289,12 +410,12 @@ class GewechatMessageConverter(adapter.MessageConverter):
sender_id = line_split[0].strip(":")
return raw_content, sender_id
except Exception as e:
print(f"__extract_content_and_sender got except: {e}")
print(f"_extract_content_and_sender got except: {e}")
finally:
return raw_content, None
# 是否是群消息
def __is_group_message(self, message: dict)->bool:
def _is_group_message(self, message: dict)->bool:
from_user_name = message['Data']['FromUserName']['string']
return from_user_name.endswith("@chatroom")
@@ -388,7 +509,6 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter):
def __init__(self, config: dict, ap: app.Application):
self.config = config
self.ap = ap
self.quart_app = quart.Quart(__name__)
self.message_converter = GewechatMessageConverter(config)
@@ -620,4 +740,4 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter):
)
async def kill(self) -> bool:
pass
pass