feat(gewechat): 优化了代码结构+fix群聊艾特逻辑,新增消息类型 (#1336)

* feat(gewechat): 优化了代码结构+fix群聊艾特逻辑,新增消息类型

* feat(gewechat): 移除不合理的message定义,优化GewechatMessageConverter

* bugfix(gewechat): fix typo

* feat(gewechat): 去掉多余日志+公众号消息和文件消息转发+msg_source取空异常fix

* bugfix(message):删除image中的xml定义

* bugfix(message): fix typo
This commit is contained in:
shinelin
2025-04-27 20:48:55 +08:00
committed by GitHub
parent efed9f3348
commit ac500266f3
2 changed files with 293 additions and 181 deletions

View File

@@ -26,7 +26,7 @@ from ..types import events as platform_events
from ..types import entities as platform_entities
from ...utils import image
import xml.etree.ElementTree as ET
from typing import Optional, List, Tuple
class GewechatMessageConverter(adapter.MessageConverter):
@@ -60,13 +60,18 @@ class GewechatMessageConverter(adapter.MessageConverter):
'link_thumb_url': component.link_thumb_url, 'link_url': component.link_url})
elif isinstance(component, platform_message.WeChatForwardLink):
content_list.append({'type': 'WeChatForwardLink', 'xml_data': component.xml_data})
elif isinstance(component, platform_message.Voice):
content_list.append({"type": "voice", "url": component.url, "length": component.length})
content_list.append({"type": "voice", "url": component.url, "length": component.length})
elif isinstance(component, platform_message.WeChatForwardImage):
content_list.append({'type': 'WeChatForwardImage', 'xml_data': component.xml_data})
elif isinstance(component, platform_message.WeChatForwardFile):
content_list.append({'type': 'WeChatForwardFile', 'xml_data': component.xml_data})
elif isinstance(component, platform_message.WeChatAppMsg):
content_list.append({'type': 'WeChatAppMsg', 'app_msg': component.app_msg})
elif isinstance(component, platform_message.Forward):
for node in component.node_list:
content_list.extend(await GewechatMessageConverter.yiri2target(node.message_chain))
if node.message_chain:
content_list.extend(await GewechatMessageConverter.yiri2target(node.message_chain))
return content_list
@@ -76,49 +81,38 @@ class GewechatMessageConverter(adapter.MessageConverter):
bot_account_id: str
) -> platform_message.MessageChain:
if message["Data"]["MsgType"] == 1:
# 检查消息开头,如果有 wxid_sbitaz0mt65n22:\n 则删掉
regex = re.compile(r"^wxid_.*:")
# print(message)
line_split = message["Data"]["Content"]["string"].split("\n")
if len(line_split) > 0 and regex.match(line_split[0]):
message["Data"]["Content"]["string"] = "\n".join(line_split[1:])
# 正则表达式模式,匹配'@'后跟任意数量的非空白字符
pattern = r'@\S+'
at_string = f"@{bot_account_id}"
content_list = []
if at_string in message["Data"]["Content"]["string"]:
# 预处理
content_list = []
ats_bot = False
raw_content = message["Data"]["Content"]["string"]
is_group_message = self.__is_group_message(message)
if is_group_message:
ats_bot = self.__ats_bot(message, bot_account_id)
# 优先处理艾特全体成员,
if "@所有人" in raw_content: ## at全员时候传入atll不当作at自己
content_list.append(platform_message.AtAll())
elif ats_bot:
content_list.append(platform_message.At(target=bot_account_id))
content_list.append(platform_message.Plain(message["Data"]["Content"]["string"].replace(at_string, '', 1)))
# 更优雅的替换改名后@机器人仅仅限于单独AT的情况
elif "PushContent" in message['Data'] and '在群聊中@了你' in message["Data"]["PushContent"]:
if '@所有人' in message["Data"]["Content"]["string"]: # at全员时候传入atll不当作at自己
content_list.append(platform_message.AtAll())
else:
content_list.append(platform_message.At(target=bot_account_id))
content_list.append(platform_message.Plain(re.sub(pattern, '', message["Data"]["Content"]["string"])))
else:
content_list = [platform_message.Plain(message["Data"]["Content"]["string"])]
raw_content, sender_id = self.__extract_content_and_sender(raw_content)
# 消息类型
msg_type = message["Data"]["MsgType"]
# 文本消息
if msg_type == 1:
# 文本清洗,仅替换群文本中的@文本[空格],的文本
if is_group_message and ats_bot:
pattern = r'@\S+'
raw_content = re.sub(pattern, '',raw_content)
content_list.append(platform_message.Plain(raw_content))
return platform_message.MessageChain(content_list)
elif message["Data"]["MsgType"] == 3:
image_xml = message["Data"]["Content"]["string"]
if image_xml.startswith('wxid'): # 此处处理群聊发送图片会有微信id开头
xml_list = image_xml.split('\n')[2:]
image_xml = '\n'.join(xml_list)
# 图像
elif msg_type == 3:
image_xml = raw_content # 已经去除群聊消息前缀
if not image_xml:
return platform_message.MessageChain([
platform_message.Plain(text="[图片内容为空]")
])
content_list.append(platform_message.Plain(text="[图片内容为空]"))
return platform_message.MessageChain(content_list)
try:
base64_str, image_format = await image.get_gewechat_image_base64(
gewechat_url=self.config["gewechat_url"],
@@ -129,17 +123,20 @@ class GewechatMessageConverter(adapter.MessageConverter):
image_type=2,
)
return platform_message.MessageChain([
platform_message.Image(
base64=f"data:image/{image_format};base64,{base64_str}"
)
])
content_list.append(platform_message.Image(
base64=f"data:image/{image_format};base64,{base64_str}"
))
# 消息链中加一个WeChatForwardImage的xml用于转发
content_list.append(platform_message.WeChatForwardImage(
xml_data = image_xml
))
return platform_message.MessageChain(content_list)
except Exception as e:
print(f"处理图片消息失败: {str(e)}")
return platform_message.MessageChain([
platform_message.Plain(text=f"[图片处理失败]")
])
elif message["Data"]["MsgType"] == 34:
content_list.append(platform_message.Plain(text=f"[图片处理失败]"))
return platform_message.MessageChain(content_list)
# 语音消息
elif msg_type == 34:
try:
audio_base64 = message["Data"]["ImgBuf"]["buffer"]
return platform_message.MessageChain(
@@ -149,23 +146,20 @@ class GewechatMessageConverter(adapter.MessageConverter):
return platform_message.MessageChain(
[platform_message.Plain(text="[无法解析群聊语音的消息]")] # 小测了一下免费版拿不到群聊语音消息的base64或者用什么办法解析xml里的url?
)
elif message["Data"]["MsgType"] == 49:
finally:
return platform_message.MessageChain(content_list)
elif msg_type == 49:
# 支持微信聊天记录的消息类型,将 XML 内容转换为 MessageChain 传递
content = message["Data"]["Content"]["string"]
try:
# content = message["Data"]["Content"]["string"]
# 有三种可能的消息结构weid开头私聊直接<?xml>和直接<msg>
if content.startswith('wxid'):
xml_list = content.split('\n')[2:]
try:
# 下方是移除<?xml,
xml_data = raw_content
if raw_content.startswith('<?xml'):
xml_list = raw_content.split('\n')[1:]
xml_data = '\n'.join(xml_list)
elif content.startswith('<?xml'):
xml_list = content.split('\n')[1:]
xml_data = '\n'.join(xml_list)
else:
xml_data = content
content_data = ET.fromstring(xml_data)
# print(xml_data)
content_data = ET.fromstring(xml_data)
# raw_content已经不会是wxid开头了
#print(xml_data)
# 拿到细分消息类型按照gewe接口中描述
'''
小程序33/36
@@ -173,54 +167,70 @@ class GewechatMessageConverter(adapter.MessageConverter):
转账消息2000
红包消息2001
视频号消息51
文件发送完成: 6
'''
appmsg_data = content_data.find('.//appmsg')
data_type = appmsg_data.find('.//type').text
data_type = appmsg_data.findtext('.//type')
if data_type == '57':
user_data = appmsg_data.find('.//title').text # 拿到用户消息
quote_data = appmsg_data.find('.//refermsg').find('.//content').text # 引用原文
sender_id = appmsg_data.find('.//refermsg').find('.//chatusr').text # 引用用户id
from_name = message['Data']['FromUserName']['string']
message_list =[]
if message['Wxid'] == sender_id and from_name.endswith('@chatroom'): # 因为引用机制暂时无法响应用户所以当引用用户是机器人是构建一个at激活机器人
message_list.append(platform_message.At(target=bot_account_id))
message_list.append(platform_message.Quote(
user_data = appmsg_data.findtext('.//title') or "" # 用户消息
quote_data = appmsg_data.find('.//refermsg').findtext('.//content') # 引用原文
sender_id = content_data.findtext('.//fromusername') # 发送方:单聊用户/群member
tousername = message['Wxid'] # 接收方: 所属微信的wxid
quote_id = appmsg_data.find('.//refermsg').findtext('.//chatusr') # 引用消息的原发送者
# 群特殊处理引用消息的原发送者是bot or @bot
# 引用判断:quote_id == tousername
if is_group_message and (quote_id == tousername):
if not platform_message.MessageChain(content_list).has(platform_message.At):
content_list.append(platform_message.At(target=bot_account_id))
content_list.append(platform_message.Quote(
sender_id=sender_id,
origin=platform_message.MessageChain(
# 这里是文本或者xml, 历史原因用了plain
# TODO: 后面需要重构一下,根据type解析具体的消息类型
[platform_message.Plain(quote_data)]
)))
message_list.append(platform_message.Plain(user_data))
return platform_message.MessageChain(message_list)
content_list.append(platform_message.Plain(user_data)) # FIXME: 这里还有wxid
return platform_message.MessageChain(content_list)
elif data_type == '51':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[视频号消息]'),
platform_message.Unknown(text=content)]
platform_message.Unknown(text=raw_content)]
)
# print(content_data)
elif data_type == '2000':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[转账消息]'),
platform_message.Unknown(text=content)]
platform_message.Unknown(text=raw_content)]
)
elif data_type == '2001':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[红包消息]'),
platform_message.Unknown(text=content)]
platform_message.Unknown(text=raw_content)]
)
elif data_type == '5':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[公众号消息]'),
platform_message.Unknown(text=content)]
content_list.append(
# platform_message.Plain(text=f'[公众号消息]'),
platform_message.WeChatForwardLink(xml_data=raw_content)
)
return platform_message.MessageChain(content_list)
elif data_type == '33' or data_type == '36':
return platform_message.MessageChain(
[ # platform_message.Plain(text=f'[小程序消息]'),
platform_message.Unknown(text=content)]
platform_message.Unknown(text=raw_content)]
)
elif data_type == "6":
# 文件消息
content_list.append(
# platform_message.Plain(text=f'[文件消息]'),
platform_message.WeChatForwardFile(xml_data=raw_content)
)
return platform_message.MessageChain(content_list)
# print(data_type.text)
else:
return platform_message.MessageChain(
[platform_message.Unknown(text=content)]
[platform_message.Unknown(text=raw_content)]
)
# try:
@@ -237,8 +247,56 @@ class GewechatMessageConverter(adapter.MessageConverter):
print(f"Error processing type 49 message: {str(e)}")
return platform_message.MessageChain(
[ # platform_message.Plain(text="[无法解析的消息]"),
platform_message.Unknown(text=content)]
platform_message.Unknown(text=raw_content)]
)
finally:
return platform_message.MessageChain(content_list)
else:
content_list.append(platform_message.Unknown(text=f"[未知消息类型 msg_type:{msg_type}"))
return platform_message.MessageChain(content_list)
# 返回是否被艾特
def __ats_bot(self, message: dict, bot_account_id:str) -> bool:
ats_bot = False
try:
to_user_name = message['Wxid'] # 接收方: 所属微信的wxid
raw_content = message["Data"]["Content"]["string"] # 原始消息内容
# step 1
ats_bot = ats_bot or (f"@{bot_account_id}" in raw_content)
# step 2
push_content = message.get('Data', {}).get('PushContent', '')
ats_bot = ats_bot or ('在群聊中@了你' in push_content)
# step 3
msg_source = message.get('Data', {}).get('MsgSource', '') or ''
if len(msg_source) > 0:
msg_source_data = ET.fromstring(msg_source)
at_user_list = msg_source_data.findtext("atuserlist") or ""
ats_bot = ats_bot or (to_user_name in at_user_list)
except Exception as e:
print(f"__ats_bot got except: {e}")
finally:
return ats_bot
# 提取一下content前面的sender_id, 和去掉前缀的内容
def __extract_content_and_sender(self, raw_content: str) -> Tuple[str, Optional[str]]:
try:
# 检查消息开头,如果有 wxid_sbitaz0mt65n22:\n 则删掉
# add: 有些用户的wxid不是上述格式。换成user_name:
regex = re.compile(r"^[a-zA-Z0-9_\-]{5,20}:")
line_split = raw_content.split("\n")
if len(line_split) > 0 and regex.match(line_split[0]):
raw_content = "\n".join(line_split[1:])
sender_id = line_split[0].strip(":")
return raw_content, sender_id
except Exception as e:
print(f"__extract_content_and_sender got except: {e}")
finally:
return raw_content, None
# 是否是群消息
def __is_group_message(self, message: dict)->bool:
from_user_name = message['Data']['FromUserName']['string']
return from_user_name.endswith("@chatroom")
class GewechatEventConverter(adapter.EventConverter):
@@ -365,55 +423,118 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter):
return 'ok'
async def _handle_message(
self,
message: platform_message.MessageChain,
target_id: str
):
"""统一消息处理核心逻辑"""
content_list = await self.message_converter.yiri2target(message)
at_targets = [item["target"] for item in content_list if item["type"] == "at"]
# 处理@逻辑
at_targets = at_targets or []
member_info = []
if at_targets:
member_info = self.bot.get_chatroom_member_detail(
self.config["app_id"],
target_id,
at_targets[::-1]
)["data"]
# 处理消息组件
for msg in content_list:
# 文本消息处理@
if msg['type'] == 'text' and at_targets:
for member in member_info:
msg['content'] = f'@{member["nickName"]} {msg["content"]}'
# 统一消息派发
handler_map = {
'text': lambda msg: self.bot.post_text(
app_id=self.config['app_id'],
to_wxid=target_id,
content=msg['content'],
ats=",".join(at_targets)
),
'image': lambda msg: self.bot.post_image(
app_id=self.config['app_id'],
to_wxid=target_id,
img_url=msg["image"]
),
'WeChatForwardMiniPrograms': lambda msg: self.bot.forward_mini_app(
app_id=self.config['app_id'],
to_wxid=target_id,
xml=msg['xml_data'],
cover_img_url=msg.get('image_url')
),
'WeChatEmoji': lambda msg: self.bot.post_emoji(
app_id=self.config['app_id'],
to_wxid=target_id,
emoji_md5=msg['emoji_md5'],
emoji_size=msg['emoji_size']
),
'WeChatLink': lambda msg: self.bot.post_link(
app_id=self.config['app_id'],
to_wxid=target_id,
title=msg['link_title'],
desc=msg['link_desc'],
link_url=msg['link_url'],
thumb_url=msg['link_thumb_url'],
),
'WeChatMiniPrograms': lambda msg: self.bot.post_mini_app(
app_id=self.config['app_id'],
to_wxid=target_id,
mini_app_id=msg['mini_app_id'],
display_name=msg['display_name'],
page_path=msg['page_path'],
cover_img_url=msg['cover_img_url'],
title=msg['title'],
user_name=msg['user_name']
),
'WeChatForwardLink': lambda msg: self.bot.forward_url(
app_id=self.config['app_id'],
to_wxid=target_id,
xml=msg['xml_data']
),
'WeChatForwardImage': lambda msg: self.bot.forward_image(
app_id=self.config['app_id'],
to_wxid=target_id,
xml=msg['xml_data']
),
'WeChatForwardFile': lambda msg: self.bot.forward_file(
app_id=self.config['app_id'],
to_wxid=target_id,
xml=msg['xml_data']
),
'voice': lambda msg: self.bot.post_voice(
app_id=self.config['app_id'],
to_wxid=target_id,
voice_url=msg['url'],
voice_duration=msg['length']
),
'WeChatAppMsg': lambda msg: self.bot.post_app_msg(
app_id=self.config['app_id'],
to_wxid=target_id,
appmsg=msg['app_msg']
),
'at': lambda msg: None
}
if handler := handler_map.get(msg['type']):
handler(msg)
else:
self.ap.logger.warning(f"未处理的消息类型: {msg['type']}")
continue
async def send_message(
self,
target_type: str,
target_id: str,
message: platform_message.MessageChain
):
geweap_msg = await self.message_converter.yiri2target(message)
# 此处加上群消息at处理
ats = [item["target"] for item in geweap_msg if item["type"] == "at"]
for msg in geweap_msg:
# at主动发送消息
if msg['type'] == 'text':
if ats:
member_info = self.bot.get_chatroom_member_detail(
self.config["app_id"],
target_id,
ats[::-1]
)["data"]
for member in member_info:
msg['content'] = f'@{member["nickName"]} {msg["content"]}'
self.bot.post_text(app_id=self.config['app_id'], to_wxid=target_id, content=msg['content'],
ats=",".join(ats))
elif msg['type'] == 'image':
self.bot.post_image(app_id=self.config['app_id'], to_wxid=target_id, img_url=msg["image"])
elif msg['type'] == 'WeChatMiniPrograms':
self.bot.post_mini_app(app_id=self.config['app_id'], to_wxid=target_id, mini_app_id=msg['mini_app_id']
, display_name=msg['display_name'], page_path=msg['page_path']
, cover_img_url=msg['cover_img_url'], title=msg['title'], user_name=msg['user_name'])
elif msg['type'] == 'WeChatForwardMiniPrograms':
self.bot.forward_mini_app(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'], cover_img_url=msg['image_url'])
elif msg['type'] == 'WeChatEmoji':
self.bot.post_emoji(app_id=self.config['app_id'], to_wxid=target_id,
emoji_md5=msg['emoji_md5'], emoji_size=msg['emoji_size'])
elif msg['type'] == 'WeChatLink':
self.bot.post_link(app_id=self.config['app_id'], to_wxid=target_id
,title=msg['link_title'], desc=msg['link_desc']
, link_url=msg['link_url'], thumb_url=msg['link_thumb_url'])
elif msg['type'] == 'WeChatForwardLink':
self.bot.forward_url(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'])
elif msg['type'] == 'voice':
self.bot.post_voice(app_id=self.config['app_id'], to_wxid=target_id, voice_url=msg['url'],voice_duration=msg['length'])
"""主动发送消息"""
return await self._handle_message(message, target_id)
async def reply_message(
self,
@@ -421,51 +542,10 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter):
message: platform_message.MessageChain,
quote_origin: bool = False
):
content_list = await self.message_converter.yiri2target(message)
ats = [item["target"] for item in content_list if item["type"] == "at"]
target_id = message_source.source_platform_object["Data"]["FromUserName"]["string"]
for msg in content_list:
if msg["type"] == "text":
if ats:
member_info = self.bot.get_chatroom_member_detail(
self.config["app_id"],
message_source.source_platform_object["Data"]["FromUserName"]["string"],
ats[::-1]
)["data"]
for member in member_info:
msg['content'] = f'@{member["nickName"]} {msg["content"]}'
self.bot.post_text(
app_id=self.config["app_id"],
to_wxid=message_source.source_platform_object["Data"]["FromUserName"]["string"],
content=msg["content"],
ats=",".join(ats)
)
elif msg['type'] == 'image':
self.bot.post_image(app_id=self.config['app_id'], to_wxid=target_id, img_url=msg["image"])
elif msg['type'] == 'WeChatMiniPrograms':
self.bot.post_mini_app(app_id=self.config['app_id'], to_wxid=target_id, mini_app_id=msg['mini_app_id']
, display_name=msg['display_name'], page_path=msg['page_path']
, cover_img_url=msg['cover_img_url'], title=msg['title'], user_name=msg['user_name'])
elif msg['type'] == 'WeChatForwardMiniPrograms':
self.bot.forward_mini_app(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'], cover_img_url=msg['image_url'])
elif msg['type'] == 'WeChatEmoji':
self.bot.post_emoji(app_id=self.config['app_id'], to_wxid=target_id,
emoji_md5=msg['emoji_md5'], emoji_size=msg['emoji_size'])
elif msg['type'] == 'WeChatLink':
self.bot.post_link(app_id=self.config['app_id'], to_wxid=target_id
, title=msg['link_title'], desc=msg['link_desc']
, link_url=msg['link_url'], thumb_url=msg['link_thumb_url'])
elif msg['type'] == 'WeChatForwardLink':
self.bot.forward_url(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'])
elif msg['type'] == 'voice':
self.bot.post_voice(app_id=self.config['app_id'], to_wxid=target_id, voice_url=msg['url'],
voice_duration=msg['length'])
"""回复消息"""
if message_source.source_platform_object:
target_id = message_source.source_platform_object["Data"]["FromUserName"]["string"]
return await self._handle_message(message, target_id)
async def is_muted(self, group_id: int) -> bool:
pass
@@ -519,8 +599,13 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter):
time.sleep(2)
ret = self.bot.set_callback(self.config["token"], self.config["callback_url"])
print('设置 Gewechat 回调:', ret)
try:
# gewechat-server容器重启, token会变但是还会登录成功
# 换新token也会收不到回调要重新登陆下。
ret = self.bot.set_callback(self.config["token"], self.config["callback_url"])
except Exception as e:
raise Exception(f"设置 Gewechat 回调失败, token失效 {e}")
threading.Thread(target=gewechat_login_process).start()

View File

@@ -1,16 +1,15 @@
import itertools
import logging
import typing
from datetime import datetime
from enum import Enum
from pathlib import Path
import typing
import pydantic.v1 as pydantic
from . import entities as platform_entities
from .base import PlatformBaseModel, PlatformIndexedMetaclass, PlatformIndexedModel
logger = logging.getLogger(__name__)
@@ -642,7 +641,8 @@ class Unknown(MessageComponent):
"""消息组件类型。"""
text: str
"""文本。"""
def __str__(self):
return f'Unknown Message: {self.text}'
class Voice(MessageComponent):
"""语音。"""
@@ -837,6 +837,8 @@ class WeChatForwardMiniPrograms(MessageComponent):
xml_data: str
"""首页图片"""
image_url: typing.Optional[str] = None
def __str__(self):
return self.xml_data
class WeChatEmoji(MessageComponent):
@@ -866,4 +868,29 @@ class WeChatForwardLink(MessageComponent):
type: str = 'WeChatForwardLink'
"""xml数据"""
xml_data: str
def __str__(self):
return self.xml_data
class WeChatForwardImage(MessageComponent):
"""转发图片。个人微信专用组件。"""
type: str = 'WeChatForwardImage'
"""xml数据"""
xml_data: str
def __str__(self):
return self.xml_data
class WeChatForwardFile(MessageComponent):
"""转发文件。个人微信专用组件。"""
type: str = 'WeChatForwardFile'
"""xml数据"""
xml_data: str
def __str__(self):
return self.xml_data
class WeChatAppMsg(MessageComponent):
"""通用appmsg发送。个人微信专用组件。"""
type: str = 'WeChatAppMsg'
"""xml数据"""
app_msg: str
def __str__(self):
return self.app_msg