fix: format the code

This commit is contained in:
WangCham
2025-08-12 23:13:00 +08:00
parent 457cc3eecd
commit eab08dfbf3
20 changed files with 329 additions and 360 deletions

View File

@@ -21,9 +21,11 @@ from ..types import message as platform_message
from ..types import events as platform_events
from ..types import entities as platform_entities
# 语音功能相关异常定义
class VoiceConnectionError(Exception):
"""语音连接基础异常"""
def __init__(self, message: str, error_code: str = None, guild_id: int = None):
super().__init__(message)
self.error_code = error_code
@@ -33,8 +35,9 @@ class VoiceConnectionError(Exception):
class VoicePermissionError(VoiceConnectionError):
"""语音权限异常"""
def __init__(self, message: str, missing_permissions: list = None, user_id: int = None, channel_id: int = None):
super().__init__(message, "PERMISSION_ERROR")
super().__init__(message, 'PERMISSION_ERROR')
self.missing_permissions = missing_permissions or []
self.user_id = user_id
self.channel_id = channel_id
@@ -42,40 +45,42 @@ class VoicePermissionError(VoiceConnectionError):
class VoiceNetworkError(VoiceConnectionError):
"""语音网络异常"""
def __init__(self, message: str, retry_count: int = 0):
super().__init__(message, "NETWORK_ERROR")
super().__init__(message, 'NETWORK_ERROR')
self.retry_count = retry_count
self.last_attempt = datetime.datetime.now()
class VoiceConnectionStatus(Enum):
"""语音连接状态枚举"""
IDLE = "idle"
CONNECTING = "connecting"
CONNECTED = "connected"
PLAYING = "playing"
RECONNECTING = "reconnecting"
FAILED = "failed"
IDLE = 'idle'
CONNECTING = 'connecting'
CONNECTED = 'connected'
PLAYING = 'playing'
RECONNECTING = 'reconnecting'
FAILED = 'failed'
class VoiceConnectionInfo:
"""
语音连接信息类
用于存储和管理单个语音连接的详细信息,包括连接状态、时间戳、
频道信息等。提供连接信息的标准化数据结构。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
"""
def __init__(self, guild_id: int, channel_id: int, channel_name: str = None):
"""
初始化语音连接信息
@author: @ydzat
Args:
guild_id (int): 服务器ID
channel_id (int): 语音频道ID
@@ -83,28 +88,28 @@ class VoiceConnectionInfo:
"""
self.guild_id = guild_id
self.channel_id = channel_id
self.channel_name = channel_name or f"Channel-{channel_id}"
self.channel_name = channel_name or f'Channel-{channel_id}'
self.connected = False
self.connection_time: datetime.datetime = None
self.last_activity = datetime.datetime.now()
self.status = VoiceConnectionStatus.IDLE
self.user_count = 0
self.latency = 0.0
self.connection_health = "unknown"
self.connection_health = 'unknown'
self.voice_client = None
def update_status(self, status: VoiceConnectionStatus):
"""
更新连接状态
@author: @ydzat
Args:
status (VoiceConnectionStatus): 新的连接状态
"""
self.status = status
self.last_activity = datetime.datetime.now()
if status == VoiceConnectionStatus.CONNECTED:
self.connected = True
if self.connection_time is None:
@@ -113,48 +118,48 @@ class VoiceConnectionInfo:
self.connected = False
self.connection_time = None
self.voice_client = None
def to_dict(self) -> dict:
"""
转换为字典格式
@author: @ydzat
Returns:
dict: 连接信息的字典表示
"""
return {
"guild_id": self.guild_id,
"channel_id": self.channel_id,
"channel_name": self.channel_name,
"connected": self.connected,
"connection_time": self.connection_time.isoformat() if self.connection_time else None,
"last_activity": self.last_activity.isoformat(),
"status": self.status.value,
"user_count": self.user_count,
"latency": self.latency,
"connection_health": self.connection_health
'guild_id': self.guild_id,
'channel_id': self.channel_id,
'channel_name': self.channel_name,
'connected': self.connected,
'connection_time': self.connection_time.isoformat() if self.connection_time else None,
'last_activity': self.last_activity.isoformat(),
'status': self.status.value,
'user_count': self.user_count,
'latency': self.latency,
'connection_health': self.connection_health,
}
class VoiceConnectionManager:
"""
语音连接管理器
负责管理多个服务器的语音连接,提供连接建立、断开、状态查询等功能。
采用单例模式确保全局只有一个连接管理器实例。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
"""
def __init__(self, bot: discord.Client, logger: EventLogger):
"""
初始化语音连接管理器
@author: @ydzat
Args:
bot (discord.Client): Discord 客户端实例
logger (EventLogger): 事件日志记录器
@@ -165,25 +170,24 @@ class VoiceConnectionManager:
self._connection_lock = asyncio.Lock()
self._cleanup_task = None
self._monitoring_enabled = True
async def join_voice_channel(self, guild_id: int, channel_id: int,
user_id: int = None) -> discord.VoiceClient:
async def join_voice_channel(self, guild_id: int, channel_id: int, user_id: int = None) -> discord.VoiceClient:
"""
加入语音频道
验证用户权限和频道状态后,建立到指定语音频道的连接。
支持连接复用和自动重连机制。
@author: @ydzat
Args:
guild_id (int): 服务器ID
channel_id (int): 语音频道ID
user_id (int, optional): 请求用户ID用于权限验证
Returns:
discord.VoiceClient: 语音客户端实例
Raises:
VoicePermissionError: 权限不足时抛出
VoiceNetworkError: 网络连接失败时抛出
@@ -194,370 +198,353 @@ class VoiceConnectionManager:
# 获取服务器和频道对象
guild = self.bot.get_guild(guild_id)
if not guild:
raise VoiceConnectionError(
f"无法找到服务器 {guild_id}",
"GUILD_NOT_FOUND",
guild_id
)
raise VoiceConnectionError(f'无法找到服务器 {guild_id}', 'GUILD_NOT_FOUND', guild_id)
channel = guild.get_channel(channel_id)
if not channel or not isinstance(channel, discord.VoiceChannel):
raise VoiceConnectionError(
f"无法找到语音频道 {channel_id}",
"CHANNEL_NOT_FOUND",
guild_id
)
raise VoiceConnectionError(f'无法找到语音频道 {channel_id}', 'CHANNEL_NOT_FOUND', guild_id)
# 验证用户是否在语音频道中如果提供了用户ID
if user_id:
await self._validate_user_in_channel(guild, channel, user_id)
# 验证机器人权限
await self._validate_bot_permissions(channel)
# 检查是否已有连接
if guild_id in self.connections:
existing_conn = self.connections[guild_id]
if existing_conn.connected and existing_conn.voice_client:
if existing_conn.channel_id == channel_id:
# 已连接到相同频道,返回现有连接
await self.logger.info(f"复用现有语音连接: {guild.name} -> {channel.name}")
await self.logger.info(f'复用现有语音连接: {guild.name} -> {channel.name}')
return existing_conn.voice_client
else:
# 连接到不同频道,先断开旧连接
await self._disconnect_internal(guild_id)
# 建立新连接
voice_client = await channel.connect()
# 更新连接信息
conn_info = VoiceConnectionInfo(guild_id, channel_id, channel.name)
conn_info.voice_client = voice_client
conn_info.update_status(VoiceConnectionStatus.CONNECTED)
conn_info.user_count = len(channel.members)
self.connections[guild_id] = conn_info
await self.logger.info(f"成功连接到语音频道: {guild.name} -> {channel.name}")
await self.logger.info(f'成功连接到语音频道: {guild.name} -> {channel.name}')
return voice_client
except discord.ClientException as e:
raise VoiceNetworkError(f"Discord 客户端错误: {str(e)}")
raise VoiceNetworkError(f'Discord 客户端错误: {str(e)}')
except discord.opus.OpusNotLoaded as e:
raise VoiceConnectionError(f"Opus 编码器未加载: {str(e)}", "OPUS_NOT_LOADED", guild_id)
raise VoiceConnectionError(f'Opus 编码器未加载: {str(e)}', 'OPUS_NOT_LOADED', guild_id)
except Exception as e:
await self.logger.error(f"连接语音频道时发生未知错误: {str(e)}")
raise VoiceConnectionError(f"连接失败: {str(e)}", "UNKNOWN_ERROR", guild_id)
await self.logger.error(f'连接语音频道时发生未知错误: {str(e)}')
raise VoiceConnectionError(f'连接失败: {str(e)}', 'UNKNOWN_ERROR', guild_id)
async def leave_voice_channel(self, guild_id: int) -> bool:
"""
离开语音频道
断开指定服务器的语音连接,清理相关资源和状态信息。
确保音频播放停止后再断开连接。
@author: @ydzat
Args:
guild_id (int): 服务器ID
Returns:
bool: 断开是否成功
"""
async with self._connection_lock:
return await self._disconnect_internal(guild_id)
async def _disconnect_internal(self, guild_id: int) -> bool:
"""
内部断开连接方法
@author: @ydzat
Args:
guild_id (int): 服务器ID
Returns:
bool: 断开是否成功
"""
if guild_id not in self.connections:
return True
conn_info = self.connections[guild_id]
try:
if conn_info.voice_client and conn_info.voice_client.is_connected():
# 停止当前播放
if conn_info.voice_client.is_playing():
conn_info.voice_client.stop()
# 等待播放完全停止
await asyncio.sleep(0.1)
# 断开连接
await conn_info.voice_client.disconnect()
conn_info.update_status(VoiceConnectionStatus.IDLE)
del self.connections[guild_id]
await self.logger.info(f"已断开语音连接: Guild {guild_id}")
await self.logger.info(f'已断开语音连接: Guild {guild_id}')
return True
except Exception as e:
await self.logger.error(f"断开语音连接时发生错误: {str(e)}")
await self.logger.error(f'断开语音连接时发生错误: {str(e)}')
# 即使出错也要清理连接记录
conn_info.update_status(VoiceConnectionStatus.FAILED)
if guild_id in self.connections:
del self.connections[guild_id]
return False
async def get_voice_client(self, guild_id: int) -> typing.Optional[discord.VoiceClient]:
"""
获取语音客户端
返回指定服务器的语音客户端实例,如果未连接则返回 None。
会验证连接的有效性,自动清理无效连接。
@author: @ydzat
Args:
guild_id (int): 服务器ID
Returns:
Optional[discord.VoiceClient]: 语音客户端实例或 None
"""
if guild_id not in self.connections:
return None
conn_info = self.connections[guild_id]
# 验证连接是否仍然有效
if conn_info.voice_client and not conn_info.voice_client.is_connected():
# 连接已失效,清理状态
await self._disconnect_internal(guild_id)
return None
return conn_info.voice_client if conn_info.connected else None
async def is_connected_to_voice(self, guild_id: int) -> bool:
"""
检查是否连接到语音频道
@author: @ydzat
Args:
guild_id (int): 服务器ID
Returns:
bool: 是否已连接
"""
if guild_id not in self.connections:
return False
conn_info = self.connections[guild_id]
# 检查实际连接状态
if conn_info.voice_client and not conn_info.voice_client.is_connected():
# 连接已失效,清理状态
await self._disconnect_internal(guild_id)
return False
return conn_info.connected
async def get_connection_status(self, guild_id: int) -> typing.Optional[dict]:
"""
获取连接状态信息
@author: @ydzat
Args:
guild_id (int): 服务器ID
Returns:
Optional[dict]: 连接状态信息字典或 None
"""
if guild_id not in self.connections:
return None
conn_info = self.connections[guild_id]
# 更新实时信息
if conn_info.voice_client and conn_info.voice_client.is_connected():
conn_info.latency = conn_info.voice_client.latency * 1000 # 转换为毫秒
conn_info.connection_health = "good" if conn_info.latency < 100 else "poor"
conn_info.connection_health = 'good' if conn_info.latency < 100 else 'poor'
# 更新频道用户数
guild = self.bot.get_guild(guild_id)
if guild:
channel = guild.get_channel(conn_info.channel_id)
if channel and isinstance(channel, discord.VoiceChannel):
conn_info.user_count = len(channel.members)
return conn_info.to_dict()
async def list_active_connections(self) -> typing.List[dict]:
"""
列出所有活跃连接
@author: @ydzat
Returns:
List[dict]: 活跃连接列表
"""
active_connections = []
for guild_id, conn_info in self.connections.items():
if conn_info.connected:
status = await self.get_connection_status(guild_id)
if status:
active_connections.append(status)
return active_connections
async def get_voice_channel_info(self, guild_id: int, channel_id: int) -> typing.Optional[dict]:
"""
获取语音频道信息
@author: @ydzat
Args:
guild_id (int): 服务器ID
channel_id (int): 频道ID
Returns:
Optional[dict]: 频道信息字典或 None
"""
guild = self.bot.get_guild(guild_id)
if not guild:
return None
channel = guild.get_channel(channel_id)
if not channel or not isinstance(channel, discord.VoiceChannel):
return None
# 获取用户信息
users = []
for member in channel.members:
users.append({
"id": member.id,
"name": member.display_name,
"status": str(member.status),
"is_bot": member.bot
})
users.append(
{'id': member.id, 'name': member.display_name, 'status': str(member.status), 'is_bot': member.bot}
)
# 获取权限信息
bot_member = guild.me
permissions = channel.permissions_for(bot_member)
return {
"channel_id": channel_id,
"channel_name": channel.name,
"guild_id": guild_id,
"guild_name": guild.name,
"user_limit": channel.user_limit,
"current_users": users,
"user_count": len(users),
"bitrate": channel.bitrate,
"permissions": {
"connect": permissions.connect,
"speak": permissions.speak,
"use_voice_activation": permissions.use_voice_activation,
"priority_speaker": permissions.priority_speaker
}
'channel_id': channel_id,
'channel_name': channel.name,
'guild_id': guild_id,
'guild_name': guild.name,
'user_limit': channel.user_limit,
'current_users': users,
'user_count': len(users),
'bitrate': channel.bitrate,
'permissions': {
'connect': permissions.connect,
'speak': permissions.speak,
'use_voice_activation': permissions.use_voice_activation,
'priority_speaker': permissions.priority_speaker,
},
}
async def _validate_user_in_channel(self, guild: discord.Guild,
channel: discord.VoiceChannel, user_id: int):
async def _validate_user_in_channel(self, guild: discord.Guild, channel: discord.VoiceChannel, user_id: int):
"""
验证用户是否在语音频道中
@author: @ydzat
Args:
guild: Discord 服务器对象
channel: 语音频道对象
user_id: 用户ID
Raises:
VoicePermissionError: 用户不在频道中时抛出
"""
member = guild.get_member(user_id)
if not member:
raise VoicePermissionError(
f"无法找到用户 {user_id}",
["member_not_found"],
user_id,
channel.id
)
raise VoicePermissionError(f'无法找到用户 {user_id}', ['member_not_found'], user_id, channel.id)
if not member.voice or member.voice.channel != channel:
raise VoicePermissionError(
f"用户 {member.display_name} 不在语音频道 {channel.name}",
["user_not_in_channel"],
f'用户 {member.display_name} 不在语音频道 {channel.name}',
['user_not_in_channel'],
user_id,
channel.id
channel.id,
)
async def _validate_bot_permissions(self, channel: discord.VoiceChannel):
"""
验证机器人权限
@author: @ydzat
Args:
channel: 语音频道对象
Raises:
VoicePermissionError: 权限不足时抛出
"""
bot_member = channel.guild.me
permissions = channel.permissions_for(bot_member)
missing_permissions = []
if not permissions.connect:
missing_permissions.append("connect")
missing_permissions.append('connect')
if not permissions.speak:
missing_permissions.append("speak")
missing_permissions.append('speak')
if missing_permissions:
raise VoicePermissionError(
f"机器人在频道 {channel.name} 中缺少权限: {', '.join(missing_permissions)}",
f'机器人在频道 {channel.name} 中缺少权限: {", ".join(missing_permissions)}',
missing_permissions,
channel_id=channel.id
channel_id=channel.id,
)
async def cleanup_inactive_connections(self):
"""
清理无效连接
定期检查并清理已断开或无效的语音连接,释放资源。
@author: @ydzat
"""
cleanup_guilds = []
for guild_id, conn_info in self.connections.items():
if not conn_info.voice_client or not conn_info.voice_client.is_connected():
cleanup_guilds.append(guild_id)
for guild_id in cleanup_guilds:
await self._disconnect_internal(guild_id)
if cleanup_guilds:
await self.logger.info(f"清理了 {len(cleanup_guilds)} 个无效的语音连接")
await self.logger.info(f'清理了 {len(cleanup_guilds)} 个无效的语音连接')
async def start_monitoring(self):
"""
开始连接监控
@author: @ydzat
"""
if self._cleanup_task is None and self._monitoring_enabled:
self._cleanup_task = asyncio.create_task(self._monitoring_loop())
async def stop_monitoring(self):
"""
停止连接监控
@author: @ydzat
"""
self._monitoring_enabled = False
@@ -568,11 +555,11 @@ class VoiceConnectionManager:
except asyncio.CancelledError:
pass
self._cleanup_task = None
async def _monitoring_loop(self):
"""
监控循环
@author: @ydzat
"""
try:
@@ -581,18 +568,18 @@ class VoiceConnectionManager:
await self.cleanup_inactive_connections()
except asyncio.CancelledError:
pass
async def disconnect_all(self):
"""
断开所有连接
@author: @ydzat
"""
async with self._connection_lock:
guild_ids = list(self.connections.keys())
for guild_id in guild_ids:
await self._disconnect_internal(guild_id)
await self.stop_monitoring()
@@ -815,7 +802,7 @@ class DiscordAdapter(adapter.MessagePlatformAdapter):
self.logger = logger
self.bot_account_id = self.config['client_id']
# 初始化语音连接管理器
self.voice_manager: VoiceConnectionManager = None
@@ -838,163 +825,162 @@ class DiscordAdapter(adapter.MessagePlatformAdapter):
args['proxy'] = os.getenv('http_proxy')
self.bot = MyClient(intents=intents, **args)
# Voice functionality methods
async def join_voice_channel(self, guild_id: int, channel_id: int,
user_id: int = None) -> discord.VoiceClient:
async def join_voice_channel(self, guild_id: int, channel_id: int, user_id: int = None) -> discord.VoiceClient:
"""
加入语音频道
为指定服务器的语音频道建立连接,支持用户权限验证和连接复用。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
channel_id (int): 语音频道ID
user_id (int, optional): 请求用户ID用于权限验证
Returns:
discord.VoiceClient: 语音客户端实例
Raises:
VoicePermissionError: 权限不足
VoiceNetworkError: 网络连接失败
VoiceConnectionError: 其他连接错误
"""
if not self.voice_manager:
raise VoiceConnectionError("语音管理器未初始化", "MANAGER_NOT_READY")
raise VoiceConnectionError('语音管理器未初始化', 'MANAGER_NOT_READY')
return await self.voice_manager.join_voice_channel(guild_id, channel_id, user_id)
async def leave_voice_channel(self, guild_id: int) -> bool:
"""
离开语音频道
断开指定服务器的语音连接,清理相关资源。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
Returns:
bool: 是否成功断开连接
"""
if not self.voice_manager:
return False
return await self.voice_manager.leave_voice_channel(guild_id)
async def get_voice_client(self, guild_id: int) -> typing.Optional[discord.VoiceClient]:
"""
获取语音客户端
返回指定服务器的语音客户端实例,用于音频播放控制。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
Returns:
Optional[discord.VoiceClient]: 语音客户端实例或 None
"""
if not self.voice_manager:
return None
return await self.voice_manager.get_voice_client(guild_id)
async def is_connected_to_voice(self, guild_id: int) -> bool:
"""
检查语音连接状态
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
Returns:
bool: 是否已连接到语音频道
"""
if not self.voice_manager:
return False
return await self.voice_manager.is_connected_to_voice(guild_id)
async def get_voice_connection_status(self, guild_id: int) -> typing.Optional[dict]:
"""
获取语音连接详细状态
返回包含连接时间、延迟、用户数等详细信息的状态字典。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
Returns:
Optional[dict]: 连接状态信息或 None
"""
if not self.voice_manager:
return None
return await self.voice_manager.get_connection_status(guild_id)
async def list_active_voice_connections(self) -> typing.List[dict]:
"""
列出所有活跃的语音连接
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Returns:
List[dict]: 活跃语音连接列表
"""
if not self.voice_manager:
return []
return await self.voice_manager.list_active_connections()
async def get_voice_channel_info(self, guild_id: int, channel_id: int) -> typing.Optional[dict]:
"""
获取语音频道详细信息
包括频道名称、用户列表、权限信息等。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
Args:
guild_id (int): Discord 服务器ID
channel_id (int): 语音频道ID
Returns:
Optional[dict]: 频道信息字典或 None
"""
if not self.voice_manager:
return None
return await self.voice_manager.get_voice_channel_info(guild_id, channel_id)
async def cleanup_voice_connections(self):
"""
清理无效的语音连接
手动触发语音连接清理,移除已断开或无效的连接。
@author: @ydzat
@version: 1.0
@since: 2025-07-04
@@ -1069,30 +1055,29 @@ class DiscordAdapter(adapter.MessagePlatformAdapter):
async def run_async(self):
"""
启动 Discord 适配器
初始化语音管理器并启动 Discord 客户端连接。
@author: @ydzat (修改)
"""
async with self.bot:
# 初始化语音管理器
self.voice_manager = VoiceConnectionManager(self.bot, self.logger)
await self.voice_manager.start_monitoring()
await self.logger.info("Discord 适配器语音功能已启用")
await self.logger.info('Discord 适配器语音功能已启用')
await self.bot.start(self.config['token'], reconnect=True)
async def kill(self) -> bool:
"""
关闭 Discord 适配器
清理语音连接并关闭 Discord 客户端。
@author: @ydzat (修改)
"""
if self.voice_manager:
await self.voice_manager.disconnect_all()
await self.bot.close()
return True

View File

@@ -501,7 +501,7 @@ class OfficialAdapter(adapter_model.MessagePlatformAdapter):
for event_handler in event_handler_mapping[event_type]:
setattr(self.bot, event_handler, wrapper)
except Exception as e:
self.logger.error(f"Error in qqbotpy callback: {traceback.format_exc()}")
self.logger.error(f'Error in qqbotpy callback: {traceback.format_exc()}')
raise e
def unregister_listener(

View File

@@ -29,10 +29,9 @@ import logging
class WeChatPadMessageConverter(adapter.MessageConverter):
def __init__(self, config: dict, logger: logging.Logger):
self.config = config
self.bot = WeChatPadClient(self.config["wechatpad_url"],self.config["token"])
self.bot = WeChatPadClient(self.config['wechatpad_url'], self.config['token'])
self.logger = logger
@staticmethod
@@ -41,9 +40,9 @@ class WeChatPadMessageConverter(adapter.MessageConverter):
for component in message_chain:
if isinstance(component, platform_message.AtAll):
content_list.append({"type": "at", "target": "all"})
content_list.append({'type': 'at', 'target': 'all'})
elif isinstance(component, platform_message.At):
content_list.append({"type": "at", "target": component.target})
content_list.append({'type': 'at', 'target': component.target})
elif isinstance(component, platform_message.Plain):
content_list.append({'type': 'text', 'content': component.text})
elif isinstance(component, platform_message.Image):
@@ -77,9 +76,9 @@ class WeChatPadMessageConverter(adapter.MessageConverter):
return content_list
async def target2yiri(
self,
message: dict,
bot_account_id: str,
self,
message: dict,
bot_account_id: str,
) -> platform_message.MessageChain:
"""外部消息转平台消息"""
# 数据预处理
@@ -92,18 +91,18 @@ class WeChatPadMessageConverter(adapter.MessageConverter):
if is_group_message:
ats_bot = self._ats_bot(message, bot_account_id)
self.logger.info(f"ats_bot: {ats_bot}; bot_account_id: {bot_account_id}; bot_wxid: {bot_wxid}")
if "@所有人" in content:
self.logger.info(f'ats_bot: {ats_bot}; bot_account_id: {bot_account_id}; bot_wxid: {bot_wxid}')
if '@所有人' in content:
message_list.append(platform_message.AtAll())
if ats_bot:
message_list.append(platform_message.At(target=bot_account_id))
# 解析@信息并生成At组件
at_targets = self._extract_at_targets(message)
for target_id in at_targets:
if target_id != bot_wxid: # 避免重复添加机器人的At
message_list.append(platform_message.At(target=target_id))
content_no_preifx, _ = self._extract_content_and_sender(content)
msg_type = message['msg_type']
@@ -421,14 +420,14 @@ class WeChatPadMessageConverter(adapter.MessageConverter):
msg_source = message.get('msg_source', '') or ''
if len(msg_source) > 0:
msg_source_data = ET.fromstring(msg_source)
at_user_list = msg_source_data.findtext("atuserlist") or ""
at_user_list = msg_source_data.findtext('atuserlist') or ''
if at_user_list:
# atuserlist格式通常是逗号分隔的用户ID列表
at_targets = [user_id.strip() for user_id in at_user_list.split(',') if user_id.strip()]
except Exception as e:
self.logger.error(f"_extract_at_targets got except: {e}")
self.logger.error(f'_extract_at_targets got except: {e}')
return at_targets
# 提取一下content前面的sender_id, 和去掉前缀的内容
def _extract_content_and_sender(self, raw_content: str) -> Tuple[str, Optional[str]]:
try:
@@ -452,22 +451,20 @@ class WeChatPadMessageConverter(adapter.MessageConverter):
class WeChatPadEventConverter(adapter.EventConverter):
def __init__(self, config: dict, logger: logging.Logger):
self.config = config
self.message_converter = WeChatPadMessageConverter(config, logger)
self.logger = logger
@staticmethod
async def yiri2target(event: platform_events.MessageEvent) -> dict:
pass
async def target2yiri(
self,
event: dict,
bot_account_id: str,
self,
event: dict,
bot_account_id: str,
) -> platform_events.MessageEvent:
# 排除公众号以及微信团队消息
if (
event['from_user_name']['str'].startswith('gh_')
@@ -579,26 +576,22 @@ class WeChatPadAdapter(adapter.MessagePlatformAdapter):
for msg in content_list:
# 文本消息处理@
if msg['type'] == 'text' and at_targets:
if "all" in at_targets:
if 'all' in at_targets:
msg['content'] = f'@所有人 {msg["content"]}'
else:
at_nick_name_list = []
for member in member_info:
if member["user_name"] in at_targets:
if member['user_name'] in at_targets:
at_nick_name_list.append(f'@{member["nick_name"]}')
msg['content'] = f'{" ".join(at_nick_name_list)} {msg["content"]}'
# 统一消息派发
handler_map = {
'text': lambda msg: self.bot.send_text_message(
to_wxid=target_id,
message=msg['content'],
ats= ["notify@all"] if "all" in at_targets else at_targets
to_wxid=target_id, message=msg['content'], ats=['notify@all'] if 'all' in at_targets else at_targets
),
'image': lambda msg: self.bot.send_image_message(
to_wxid=target_id,
img_url=msg["image"],
ats = ["notify@all"] if "all" in at_targets else at_targets
to_wxid=target_id, img_url=msg['image'], ats=['notify@all'] if 'all' in at_targets else at_targets
),
'WeChatEmoji': lambda msg: self.bot.send_emoji_message(
to_wxid=target_id, emoji_md5=msg['emoji_md5'], emoji_size=msg['emoji_size']

View File

@@ -812,12 +812,14 @@ class File(MessageComponent):
def __str__(self):
return f'[文件]{self.name}'
class Face(MessageComponent):
"""系统表情
此处将超级表情骰子/划拳一同归类于face
当face_type为rps(划拳)时 face_id 对应的是手势
当face_type为dice(骰子)时 face_id 对应的是点数
"""
type: str = 'Face'
"""表情类型"""
face_type: str = 'face'
@@ -834,15 +836,15 @@ class Face(MessageComponent):
elif self.face_type == 'rps':
return f'[表情]{self.face_name}({self.rps_data(self.face_id)})'
def rps_data(self,face_id):
rps_dict ={
1 : "",
2 : "剪刀",
3 : "石头",
def rps_data(self, face_id):
rps_dict = {
1: '',
2: '剪刀',
3: '石头',
}
return rps_dict[face_id]
# ================ 个人微信专用组件 ================
@@ -971,5 +973,6 @@ class WeChatFile(MessageComponent):
"""文件地址"""
file_base64: str = ''
"""base64"""
def __str__(self):
return f'[文件]{self.file_name}'
return f'[文件]{self.file_name}'