From d9378c3a88215272728974db80a24208329ec913 Mon Sep 17 00:00:00 2001 From: fdc310 <82008029+fdc310@users.noreply.github.com> Date: Fri, 1 May 2026 02:33:44 +0800 Subject: [PATCH] feat: Support WebSocket mode and enhance message processing capabilities (#2156) * feat: Support WebSocket mode and enhance message processing capabilities * feat: add steam * feat: enhance QQOfficialClient and QQOfficialAdapter with improved logging and stream context management --- src/langbot/libs/qq_official_api/api.py | 605 +++++++++++++++++- src/langbot/pkg/platform/botmgr.py | 2 +- .../pkg/platform/sources/qqofficial.py | 365 ++++++++++- .../pkg/platform/sources/qqofficial.yaml | 58 +- 4 files changed, 950 insertions(+), 80 deletions(-) diff --git a/src/langbot/libs/qq_official_api/api.py b/src/langbot/libs/qq_official_api/api.py index 51a56d53..db3194b6 100644 --- a/src/langbot/libs/qq_official_api/api.py +++ b/src/langbot/libs/qq_official_api/api.py @@ -1,8 +1,10 @@ +import re import time +import asyncio from quart import request import httpx from quart import Quart -from typing import Callable, Dict, Any +from typing import Callable, Dict, Any, Optional import langbot_plugin.api.entities.builtin.platform.events as platform_events from .qqofficialevent import QQOfficialEvent import json @@ -32,6 +34,8 @@ class QQOfficialClient: self.access_token = '' self.access_token_expiry_time = None self.logger = logger + self._msg_seq_counter = 0 + self._token_refresh_task: Optional[asyncio.Task] = None async def check_access_token(self): """检查access_token是否存在""" @@ -50,18 +54,18 @@ class QQOfficialClient: headers = { 'content-type': 'application/json', } - try: - response = await client.post(url, json=params, headers=headers) - if response.status_code == 200: - response_data = response.json() - access_token = response_data.get('access_token') - expires_in = int(response_data.get('expires_in', 7200)) - self.access_token_expiry_time = time.time() + expires_in - 60 - if access_token: - self.access_token = access_token - except Exception as e: - await self.logger.error(f'获取access_token失败: {response_data}') - raise Exception(f'获取access_token失败: {e}') + response = await client.post(url, json=params, headers=headers) + if response.status_code != 200: + raise Exception(f'Failed to get access_token: HTTP {response.status_code} {response.text}') + response_data = response.json() + access_token = response_data.get('access_token') + expires_in = int(response_data.get('expires_in', 7200)) + self.access_token_expiry_time = time.time() + expires_in - 60 + if access_token: + self.access_token = access_token + await self.logger.info(f'access_token obtained, expires_in={expires_in}s') + else: + raise Exception('Failed to get access_token: no access_token in response') async def handle_callback_request(self): """处理回调请求(独立端口模式,使用全局 request)""" @@ -87,10 +91,10 @@ class QQOfficialClient: try: body = await req.get_data() - print(f'[QQ Official] Received request, body length: {len(body)}') + await self.logger.info(f'Received request, body length: {len(body)}') if not body or len(body) == 0: - print('[QQ Official] Received empty body, might be health check or GET request') + await self.logger.info('Received empty body, might be health check or GET request') return {'code': 0, 'message': 'ok'}, 200 payload = json.loads(body) @@ -111,7 +115,6 @@ class QQOfficialClient: return {'code': 0, 'message': 'success'} except Exception as e: - print(f'[QQ Official] ERROR: {traceback.format_exc()}') await self.logger.error(f'Error in handle_callback_request: {traceback.format_exc()}') return {'error': str(e)}, 400 @@ -139,21 +142,24 @@ class QQOfficialClient: async def get_message(self, msg: dict) -> Dict[str, Any]: """获取消息""" + d = msg.get('d', {}) + if not isinstance(d, dict): + return {} message_data = { 't': msg.get('t', {}), - 'user_openid': msg.get('d', {}).get('author', {}).get('user_openid', {}), - 'timestamp': msg.get('d', {}).get('timestamp', {}), - 'd_author_id': msg.get('d', {}).get('author', {}).get('id', {}), - 'content': msg.get('d', {}).get('content', {}), - 'd_id': msg.get('d', {}).get('id', {}), + 'user_openid': d.get('author', {}).get('user_openid', {}), + 'timestamp': d.get('timestamp', {}), + 'd_author_id': d.get('author', {}).get('id', {}), + 'content': d.get('content', {}), + 'd_id': d.get('id', {}), 'id': msg.get('id', {}), - 'channel_id': msg.get('d', {}).get('channel_id', {}), - 'username': msg.get('d', {}).get('author', {}).get('username', {}), - 'guild_id': msg.get('d', {}).get('guild_id', {}), - 'member_openid': msg.get('d', {}).get('author', {}).get('openid', {}), - 'group_openid': msg.get('d', {}).get('group_openid', {}), + 'channel_id': d.get('channel_id', {}), + 'username': d.get('author', {}).get('username', {}), + 'guild_id': d.get('guild_id', {}), + 'member_openid': d.get('author', {}).get('openid', {}), + 'group_openid': d.get('group_openid', {}), } - attachments = msg.get('d', {}).get('attachments', []) + attachments = d.get('attachments', []) image_attachments = [attachment['url'] for attachment in attachments if await self.is_image(attachment)] image_attachments_type = [ attachment['content_type'] for attachment in attachments if await self.is_image(attachment) @@ -192,7 +198,7 @@ class QQOfficialClient: if response.status_code == 200: return else: - await self.logger.error(f'发送私聊消息失败: {response_data}') + await self.logger.error(f'Failed to send private message: {response_data}') raise ValueError(response) async def send_group_text_msg(self, group_openid: str, content: str, msg_id: str): @@ -215,7 +221,7 @@ class QQOfficialClient: if response.status_code == 200: return else: - await self.logger.error(f'发送群聊消息失败:{response.json()}') + await self.logger.error(f'Failed to send group message: {response.json()}') raise Exception(response.read().decode()) async def send_channle_group_text_msg(self, channel_id: str, content: str, msg_id: str): @@ -238,7 +244,7 @@ class QQOfficialClient: if response.status_code == 200: return True else: - await self.logger.error(f'发送频道群聊消息失败: {response.json()}') + await self.logger.error(f'Failed to send channel group message: {response.json()}') raise Exception(response) async def send_channle_private_text_msg(self, guild_id: str, content: str, msg_id: str): @@ -261,9 +267,224 @@ class QQOfficialClient: if response.status_code == 200: return True else: - await self.logger.error(f'发送频道私聊消息失败: {response.json()}') + await self.logger.error(f'Failed to send channel private message: {response.json()}') raise Exception(response) + # ---- 富媒体消息 ---- + + # 媒体文件类型 + MEDIA_TYPE_IMAGE = 1 + MEDIA_TYPE_VIDEO = 2 + MEDIA_TYPE_VOICE = 3 + MEDIA_TYPE_FILE = 4 + + async def upload_media( + self, + target_type: str, + target_id: str, + file_type: int, + file_url: str = None, + file_data: str = None, + file_name: str = None, + ) -> str: + """上传媒体文件,返回 file_info。 + + Args: + target_type: 'c2c' | 'group' + target_id: 用户 openid 或群 openid + file_type: 1=图片, 2=视频, 3=语音, 4=文件 + file_url: 在线 URL(与 file_data 二选一) + file_data: base64 编码的文件数据或 data URL(与 file_url 二选一) + file_name: 文件名(file_type=4 时必填) + """ + if not await self.check_access_token(): + await self.get_access_token() + + if target_type == 'c2c': + url = f'{self.base_url}/v2/users/{target_id}/files' + elif target_type == 'group': + url = f'{self.base_url}/v2/groups/{target_id}/files' + else: + raise ValueError(f'Unsupported target_type: {target_type}') + + body = { + 'file_type': file_type, + 'srv_send_msg': False, + } + if file_url: + body['url'] = file_url + elif file_data: + # 处理 data URL 格式: data:image/png;base64,xxxxx + if file_data.startswith('data:'): + match = re.match(r'^data:[^;]+;base64,(.+)$', file_data, re.DOTALL) + if match: + body['file_data'] = match.group(1) + else: + body['file_data'] = file_data + else: + body['file_data'] = file_data + else: + raise ValueError('file_url or file_data is required') + + if file_type == self.MEDIA_TYPE_FILE and file_name: + body['file_name'] = file_name + + async with httpx.AsyncClient(timeout=120) as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + 'Content-Type': 'application/json', + } + response = await client.post(url, headers=headers, json=body) + if response.status_code == 200: + data = response.json() + file_info = data.get('file_info', '') + preview = file_info[:80] + '...' if len(file_info) > 80 else file_info + await self.logger.info(f'Upload media success, file_info={preview}') + return file_info + else: + raise Exception(f'Failed to upload media: HTTP {response.status_code} {response.text}') + + async def _send_media_msg( + self, + target_type: str, + target_id: str, + file_info: str, + msg_id: str = None, + content: str = None, + ): + """发送富媒体消息(msg_type=7)""" + if not await self.check_access_token(): + await self.get_access_token() + + if target_type == 'c2c': + url = f'{self.base_url}/v2/users/{target_id}/messages' + elif target_type == 'group': + url = f'{self.base_url}/v2/groups/{target_id}/messages' + else: + raise ValueError(f'Unsupported target_type: {target_type}') + + self._msg_seq_counter += 1 + msg_seq = self._msg_seq_counter + body = { + 'msg_type': 7, + 'media': {'file_info': file_info}, + 'msg_seq': msg_seq, + } + if content: + body['content'] = content + if msg_id: + body['msg_id'] = msg_id + + async with httpx.AsyncClient(timeout=120) as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + 'Content-Type': 'application/json', + } + await self.logger.info(f'Sending rich media: {json.dumps(body, ensure_ascii=False)[:200]}') + response = await client.post(url, headers=headers, json=body) + if response.status_code != 200: + raise Exception(f'Failed to send rich media message: HTTP {response.status_code} {response.text}') + + async def send_image_msg( + self, + target_type: str, + target_id: str, + file_url: str = None, + file_data: str = None, + msg_id: str = None, + content: str = None, + ): + """发送图片消息""" + file_info = await self.upload_media( + target_type, + target_id, + self.MEDIA_TYPE_IMAGE, + file_url=file_url, + file_data=file_data, + ) + await self._send_media_msg(target_type, target_id, file_info, msg_id, content) + + async def send_voice_msg( + self, + target_type: str, + target_id: str, + file_url: str = None, + file_data: str = None, + msg_id: str = None, + ): + """发送语音消息""" + file_info = await self.upload_media( + target_type, + target_id, + self.MEDIA_TYPE_VOICE, + file_url=file_url, + file_data=file_data, + ) + await self._send_media_msg(target_type, target_id, file_info, msg_id) + + async def send_file_msg( + self, + target_type: str, + target_id: str, + file_url: str = None, + file_data: str = None, + file_name: str = None, + msg_id: str = None, + ): + """发送文件消息(含视频)""" + file_info = await self.upload_media( + target_type, + target_id, + self.MEDIA_TYPE_FILE, + file_url=file_url, + file_data=file_data, + file_name=file_name, + ) + await self._send_media_msg(target_type, target_id, file_info, msg_id) + + async def send_stream_msg( + self, + user_openid: str, + content: str, + event_id: str, + msg_id: str, + msg_seq: int = 1, + index: int = 0, + stream_msg_id: str = None, + input_state: int = 1, + ): + """发送流式消息(C2C 私聊)。 + + Args: + input_state: 1=生成中, 10=生成结束 + """ + if not await self.check_access_token(): + await self.get_access_token() + + url = f'{self.base_url}/v2/users/{user_openid}/stream_messages' + body = { + 'input_mode': 'replace', + 'input_state': input_state, + 'content_type': 'markdown', + 'content_raw': content, + 'event_id': event_id, + 'msg_id': msg_id, + 'msg_seq': msg_seq, + 'index': index, + } + if stream_msg_id: + body['stream_msg_id'] = stream_msg_id + + async with httpx.AsyncClient(timeout=120) as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + 'Content-Type': 'application/json', + } + response = await client.post(url, headers=headers, json=body) + if response.status_code != 200: + raise Exception(f'Failed to send stream message: HTTP {response.status_code} {response.text}') + return response.json() + async def is_token_expired(self): """检查token是否过期""" if self.access_token_expiry_time is None: @@ -292,3 +513,325 @@ class QQOfficialClient: 'signature': signature, } return response + + # ---- WebSocket Gateway ---- + # Reference: https://bot.q.qq.com/wiki/develop/api-v2/dev-prepare/interface-framework/event-emit.html + + INTENT_GUILDS = 1 << 0 + INTENT_GUILD_MEMBERS = 1 << 1 + INTENT_PUBLIC_GUILD_MESSAGES = 1 << 30 + INTENT_DIRECT_MESSAGE = 1 << 12 + INTENT_GROUP_AND_C2C = 1 << 25 + INTENT_INTERACTION = 1 << 26 + + FULL_INTENTS = ( + INTENT_GUILDS + | INTENT_GUILD_MEMBERS + | INTENT_PUBLIC_GUILD_MESSAGES + | INTENT_DIRECT_MESSAGE + | INTENT_GROUP_AND_C2C + | INTENT_INTERACTION + ) + + async def get_gateway_url(self) -> str: + """获取 WebSocket 网关地址""" + if not await self.check_access_token(): + await self.get_access_token() + + url = f'{self.base_url}/gateway' + async with httpx.AsyncClient() as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + } + response = await client.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + ws_url = data.get('url', '') + if not ws_url: + raise Exception('Gateway URL is empty') + return ws_url + else: + raise Exception(f'Failed to get Gateway URL: HTTP {response.status_code} {response.text}') + + async def _background_token_refresh(self): + """在 token 到期前主动刷新""" + try: + while True: + if self.access_token_expiry_time: + remain = self.access_token_expiry_time - time.time() + if remain > 120: + await asyncio.sleep(remain - 60) + continue + self.access_token = '' + self.access_token_expiry_time = None + if await self.check_access_token(): + await asyncio.sleep(60) + else: + await self.get_access_token() + await asyncio.sleep(60) + except asyncio.CancelledError: + pass + + async def connect_gateway( + self, + on_event: Callable[[str, dict], Any], + on_ready: Optional[Callable[[], Any]] = None, + on_error: Optional[Callable[[Exception], Any]] = None, + ): + """WebSocket 网关连接,含重连逻辑。持续重连直到达到最大次数或被取消。 + + Args: + on_event: 收到 op=0 Dispatch 事件时的回调,参数为 (event_type, event_data) + on_ready: 连接就绪 (收到 READY) 时的回调 + on_error: 发生错误时的回调 + """ + import websockets + + session_id = '' + last_seq = 0 + reconnect_attempts = 0 + max_reconnect_attempts = 100 + backoff_delays = [1, 2, 5, 10, 30, 60] + rate_limit_delay = 60 + + # Cancel previous token refresh task if any + if self._token_refresh_task and not self._token_refresh_task.done(): + self._token_refresh_task.cancel() + try: + await self._token_refresh_task + except asyncio.CancelledError: + pass + self._token_refresh_task = None + + while reconnect_attempts <= max_reconnect_attempts: + heartbeat_interval = 45000 + should_refresh_token = False + ws = None + heartbeat_task = None + + # Refresh token if needed + if should_refresh_token: + self.access_token = '' + self.access_token_expiry_time = None + + try: + ws_url = await self.get_gateway_url() + await self.logger.info(f'Gateway URL obtained: {ws_url[:60]}...') + except Exception as e: + error_msg = str(e) + await self.logger.error(f'Failed to get gateway URL: {e}') + reconnect_attempts += 1 + if '100017' in error_msg or '频率' in error_msg or 'Too many' in error_msg: + delay = rate_limit_delay + else: + delay = backoff_delays[min(reconnect_attempts - 1, len(backoff_delays) - 1)] + await self.logger.info(f'Reconnecting in {delay}s (attempt {reconnect_attempts})') + await asyncio.sleep(delay) + continue + + try: + await self.logger.info('Connecting to WebSocket gateway...') + ws = await websockets.connect(ws_url) + await self.logger.info('WebSocket connected') + except Exception as e: + await self.logger.error(f'WebSocket connection failed: {e}') + reconnect_attempts += 1 + delay = backoff_delays[min(reconnect_attempts - 1, len(backoff_delays) - 1)] + await self.logger.info(f'Reconnecting in {delay}s (attempt {reconnect_attempts})') + await asyncio.sleep(delay) + continue + + try: + async for raw_msg in ws: + try: + payload = json.loads(raw_msg) + except json.JSONDecodeError: + await self.logger.error(f'Failed to parse message: {raw_msg}') + continue + + op = payload.get('op') + d = payload.get('d', {}) + s = payload.get('s') + t = payload.get('t') + + if not isinstance(d, dict): + d = {} + + if op == 10: # Hello + heartbeat_interval = d.get('heartbeat_interval', 45000) + await self.logger.info(f'Received Hello, heartbeat_interval={heartbeat_interval}ms') + + # Send Identify or Resume + if session_id and last_seq > 0: + resume_payload = { + 'op': 6, + 'd': { + 'token': f'QQBot {self.access_token}', + 'session_id': session_id, + 'seq': last_seq, + }, + } + await ws.send(json.dumps(resume_payload)) + await self.logger.info(f'Sent Resume, session_id={session_id}, seq={last_seq}') + else: + identify_payload = { + 'op': 2, + 'd': { + 'token': f'QQBot {self.access_token}', + 'intents': self.FULL_INTENTS, + 'shard': [0, 1], + }, + } + await ws.send(json.dumps(identify_payload)) + await self.logger.info(f'Sent Identify, intents={self.FULL_INTENTS}') + + # Start heartbeat + async def _heartbeat_loop(conn, interval_ms): + interval_sec = interval_ms / 1000.0 + try: + while True: + await asyncio.sleep(interval_sec) + try: + hb_payload = {'op': 1, 'd': last_seq} + await conn.send(json.dumps(hb_payload)) + except Exception: + break + except asyncio.CancelledError: + pass + + heartbeat_task = asyncio.create_task(_heartbeat_loop(ws, heartbeat_interval)) + + elif op == 0: # Dispatch + if s is not None: + last_seq = s + + if t == 'READY': + session_id = d.get('session_id', '') + reconnect_attempts = 0 + await self.logger.info(f'READY, session_id={session_id}') + if on_ready: + try: + result = on_ready() + if asyncio.iscoroutine(result): + await result + except Exception: + pass + # Track token refresh task to avoid leaks + if self._token_refresh_task and not self._token_refresh_task.done(): + self._token_refresh_task.cancel() + try: + await self._token_refresh_task + except asyncio.CancelledError: + pass + self._token_refresh_task = asyncio.create_task(self._background_token_refresh()) + + elif t == 'RESUMED': + reconnect_attempts = 0 + await self.logger.info('RESUMED') + + else: + await self.logger.debug(f'Received event: {t}, seq={s}') + if on_event: + try: + result = on_event(t, d) + if asyncio.iscoroutine(result): + await result + except Exception: + await self.logger.error(f'Error handling event {t}: {traceback.format_exc()}') + + elif op == 11: # Heartbeat ACK + pass + + elif op == 7: # Reconnect + await self.logger.info('Received Reconnect directive') + break + + elif op == 9: # Invalid Session + can_resume = d.get('can_resume', False) + await self.logger.warning(f'Invalid Session, can_resume={can_resume}') + if not can_resume: + session_id = '' + last_seq = 0 + should_refresh_token = True + break + + # Connection closed normally (end of async for) + try: + close_code = ws.close_code + close_reason = ws.close_reason or '' + except Exception: + close_code = None + close_reason = '' + await self.logger.info(f'Connection closed, code={close_code}, reason={close_reason}') + + if close_code == 4004: + should_refresh_token = True + elif close_code in (4006, 4007, 4009): + session_id = '' + last_seq = 0 + should_refresh_token = True + elif close_code == 4008: + reconnect_attempts += 1 + delay = rate_limit_delay + await self.logger.info( + f'Rate limited, waiting {delay}s before reconnect (attempt {reconnect_attempts})' + ) + await asyncio.sleep(delay) + continue + elif close_code in (4914, 4915): + err = Exception(f'Bot disconnected/banned (close_code={close_code})') + if on_error: + await self._safe_callback(on_error, err) + return + elif close_code in (4900, 4901, 4902, 4903, 4904, 4905, 4906, 4907, 4908, 4909, 4910, 4911, 4912, 4913): + session_id = '' + last_seq = 0 + + if close_code == 1000: + return + + except asyncio.CancelledError: + raise + except Exception: + await self.logger.error(f'Unexpected error in WebSocket loop: {traceback.format_exc()}') + finally: + if heartbeat_task: + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass + if ws: + try: + await ws.close() + except Exception: + pass + + # If we reach here, we need to reconnect + reconnect_attempts += 1 + if reconnect_attempts > max_reconnect_attempts: + await self.logger.error(f'Max reconnect attempts ({max_reconnect_attempts}) reached, stopping') + if on_error: + await self._safe_callback(on_error, Exception('Max reconnect attempts reached')) + return + delay = backoff_delays[min(reconnect_attempts - 1, len(backoff_delays) - 1)] + await self.logger.info(f'Reconnecting in {delay}s (attempt {reconnect_attempts})') + await asyncio.sleep(delay) + + async def _safe_callback(self, callback, *args): + """Safely invoke a callback, handling both sync and async functions.""" + try: + result = callback(*args) + if asyncio.iscoroutine(result): + await result + except Exception: + pass + + async def connect_gateway_loop( + self, + on_event: Callable[[str, dict], Any], + on_ready: Optional[Callable[[], Any]] = None, + on_error: Optional[Callable[[Exception], Any]] = None, + ): + """持续重连的网关循环。""" + await self.connect_gateway(on_event, on_ready, on_error) diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 6ae0f4c2..8e99618c 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -523,7 +523,7 @@ class PlatformManager: return None async def remove_bot(self, bot_uuid: str): - for bot in self.bots: + for bot in self.bots[:]: if bot.bot_entity.uuid == bot_uuid: if bot.enable: await bot.shutdown() diff --git a/src/langbot/pkg/platform/sources/qqofficial.py b/src/langbot/pkg/platform/sources/qqofficial.py index 354afc41..8af40697 100644 --- a/src/langbot/pkg/platform/sources/qqofficial.py +++ b/src/langbot/pkg/platform/sources/qqofficial.py @@ -1,9 +1,11 @@ from __future__ import annotations import typing +import re import asyncio import traceback import datetime +import time import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter import langbot_plugin.api.entities.builtin.platform.message as platform_message @@ -15,11 +17,25 @@ from ...utils import image from ..logger import EventLogger +def _is_base64_data(value: str) -> bool: + """Check if a string contains base64-encoded data rather than a URL.""" + if not value: + return False + # data: URI scheme (e.g. data:image/png;base64,xxx) + if value.startswith('data:'): + return True + # Only treat as base64 if it doesn't look like a URL/path and has valid base64 chars + if value.startswith(('http://', 'https://', '/', './', '../')): + return False + # Check if it looks like base64 (only valid chars, reasonable length) + return bool(re.fullmatch(r'[A-Za-z0-9+/=\s]{20,}', value)) + + class QQOfficialMessageConverter(abstract_platform_adapter.AbstractMessageConverter): @staticmethod async def yiri2target(message_chain: platform_message.MessageChain): + """将 LangBot 消息链转换为 QQ Official 消息格式列表。""" content_list = [] - # 只实现了发文字 for msg in message_chain: if type(msg) is platform_message.Plain: content_list.append( @@ -28,6 +44,49 @@ class QQOfficialMessageConverter(abstract_platform_adapter.AbstractMessageConver 'content': msg.text, } ) + elif type(msg) is platform_message.Image: + url = msg.url if hasattr(msg, 'url') and msg.url else None + b64 = msg.base64 if hasattr(msg, 'base64') and msg.base64 else None + # Some plugins (e.g. MimoTTS) store base64 data in the url field + if url and not b64 and _is_base64_data(url): + b64 = url + url = None + content_list.append( + { + 'type': 'image', + 'url': url, + 'base64': b64, + } + ) + elif type(msg) is platform_message.Voice: + url = msg.url if hasattr(msg, 'url') and msg.url else None + b64 = msg.base64 if hasattr(msg, 'base64') and msg.base64 else None + # Some plugins (e.g. MimoTTS) store base64 data in the url field + if url and not b64 and _is_base64_data(url): + b64 = url + url = None + content_list.append( + { + 'type': 'voice', + 'url': url, + 'base64': b64, + } + ) + elif type(msg) is platform_message.File: + url = msg.url if hasattr(msg, 'url') and msg.url else None + b64 = msg.base64 if hasattr(msg, 'base64') and msg.base64 else None + # Some plugins store base64 data in the url field + if url and not b64 and _is_base64_data(url): + b64 = url + url = None + content_list.append( + { + 'type': 'file', + 'url': url, + 'base64': b64, + 'name': msg.name if hasattr(msg, 'name') else 'file', + } + ) return content_list @@ -129,12 +188,19 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter config: dict bot_account_id: str bot_uuid: str = None + enable_webhook: bool = False message_converter: QQOfficialMessageConverter = QQOfficialMessageConverter() event_converter: QQOfficialEventConverter = QQOfficialEventConverter() def __init__(self, config: dict, logger: EventLogger): + enable_webhook = config.get('enable-webhook', False) + bot = QQOfficialClient( - app_id=config['appid'], secret=config['secret'], token=config['token'], logger=logger, unified_mode=True + app_id=config['appid'], + secret=config['secret'], + token=config['token'], + logger=logger, + unified_mode=enable_webhook, ) super().__init__( @@ -144,6 +210,13 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter bot_account_id=config['appid'], ) + self.enable_webhook = enable_webhook + self._ws_task: asyncio.Task = None + self._stream_ctx: dict = {} + self._stream_ctx_ts: dict[str, float] = {} + self._fallback_text: dict[str, str] = {} + self._fallback_text_ts: dict[str, float] = {} + async def reply_message( self, message_source: platform_events.MessageEvent, @@ -156,28 +229,18 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter content_list = await QQOfficialMessageConverter.yiri2target(message) - # 私聊消息 + # 确定 target_type 和 target_id + target_type = None + target_id = None + if qq_official_event.t == 'C2C_MESSAGE_CREATE': - for content in content_list: - if content['type'] == 'text': - await self.bot.send_private_text_msg( - qq_official_event.user_openid, - content['content'], - qq_official_event.d_id, - ) - - # 群聊消息 - if qq_official_event.t == 'GROUP_AT_MESSAGE_CREATE': - for content in content_list: - if content['type'] == 'text': - await self.bot.send_group_text_msg( - qq_official_event.group_openid, - content['content'], - qq_official_event.d_id, - ) - - # 频道群聊 - if qq_official_event.t == 'AT_MESSAGE_CREATE': + target_type = 'c2c' + target_id = qq_official_event.user_openid + elif qq_official_event.t == 'GROUP_AT_MESSAGE_CREATE': + target_type = 'group' + target_id = qq_official_event.group_openid + elif qq_official_event.t == 'AT_MESSAGE_CREATE': + # 频道群聊使用频道 API,暂不支持富媒体 for content in content_list: if content['type'] == 'text': await self.bot.send_channle_group_text_msg( @@ -185,9 +248,9 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter content['content'], qq_official_event.d_id, ) - - # 频道私聊 - if qq_official_event.t == 'DIRECT_MESSAGE_CREATE': + return + elif qq_official_event.t == 'DIRECT_MESSAGE_CREATE': + # 频道私聊使用频道 API,暂不支持富媒体 for content in content_list: if content['type'] == 'text': await self.bot.send_channle_private_text_msg( @@ -195,6 +258,63 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter content['content'], qq_official_event.d_id, ) + return + + # C2C 和群聊:支持文字 + 富媒体 + for content in content_list: + content_type = content.get('type', 'text') + + if content_type == 'text': + if target_type == 'c2c': + await self.bot.send_private_text_msg( + target_id, + content['content'], + qq_official_event.d_id, + ) + elif target_type == 'group': + await self.bot.send_group_text_msg( + target_id, + content['content'], + qq_official_event.d_id, + ) + + elif content_type == 'image': + file_url = content.get('url') + file_data = content.get('base64') + if file_url or file_data: + await self.bot.send_image_msg( + target_type, + target_id, + file_url=file_url, + file_data=file_data, + msg_id=qq_official_event.d_id, + ) + + elif content_type == 'voice': + file_url = content.get('url') + file_data = content.get('base64') + if file_url or file_data: + await self.bot.send_voice_msg( + target_type, + target_id, + file_url=file_url, + file_data=file_data, + msg_id=qq_official_event.d_id, + ) + + elif content_type == 'file': + file_url = content.get('url') + file_data = content.get('base64') + file_name = content.get('name', 'file') + if file_url or file_data: + await self.bot.send_file_msg( + target_type, + target_id, + file_url=file_url, + file_data=file_data, + file_name=file_name, + msg_id=qq_official_event.d_id, + ) async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): pass @@ -238,17 +358,196 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter return await self.bot.handle_unified_webhook(request) async def run_async(self): - # 统一 webhook 模式下,不启动独立的 Quart 应用 - # 保持运行但不启动独立端口 + if not self.enable_webhook: + await self._run_websocket() + else: + # 统一 webhook 模式下,不启动独立的 Quart 应用 + async def keep_alive(): + while True: + await asyncio.sleep(1) - async def keep_alive(): - while True: - await asyncio.sleep(1) + await keep_alive() - await keep_alive() + async def _run_websocket(self): + """以 WebSocket 模式运行网关连接""" + await self.logger.info('QQ Official adapter starting in WebSocket mode') + + async def on_ready(): + await self.logger.info('QQ Official WebSocket connected and ready') + + async def on_event(event_type: str, event_data: dict): + # 只处理消息事件,忽略 READY/RESUMED 等系统事件 + message_event_types = { + 'C2C_MESSAGE_CREATE', + 'DIRECT_MESSAGE_CREATE', + 'GROUP_AT_MESSAGE_CREATE', + 'AT_MESSAGE_CREATE', + } + if event_type not in message_event_types: + return + if not isinstance(event_data, dict): + await self.logger.warning(f'Event data is not dict, skipping: {event_type} -> {type(event_data)}') + return + await self.logger.info(f'Processing message event: {event_type}') + # 构造与 webhook 模式相同的 payload 结构 + payload = {'t': event_type, 'd': event_data} + message_data = await self.bot.get_message(payload) + if message_data: + event = QQOfficialEvent.from_payload(message_data) + await self.bot._handle_message(event) + + async def on_error(error: Exception): + await self.logger.error(f'WebSocket error: {error}') + await self.logger.error(f'QQ Official WebSocket error: {error}') + + self._ws_task = asyncio.create_task(self.bot.connect_gateway_loop(on_event, on_ready, on_error)) + try: + await self._ws_task + except asyncio.CancelledError: + pass async def kill(self) -> bool: - return False + if self._ws_task: + self._ws_task.cancel() + try: + await self._ws_task + except asyncio.CancelledError: + pass + self._ws_task = None + return True + + # --------------- 流式输出 --------------- + + _STREAM_CTX_TTL = 300 # seconds + + async def _cleanup_stale_streams(self): + """Remove stream contexts that have not been updated for more than _STREAM_CTX_TTL seconds.""" + now = time.time() + stale_ids = [mid for mid, ts in self._stream_ctx_ts.items() if now - ts > self._STREAM_CTX_TTL] + for mid in stale_ids: + self._stream_ctx.pop(mid, None) + self._stream_ctx_ts.pop(mid, None) + stale_fb = [mid for mid, ts in self._fallback_text_ts.items() if now - ts > self._STREAM_CTX_TTL] + for mid in stale_fb: + self._fallback_text.pop(mid, None) + self._fallback_text_ts.pop(mid, None) + if stale_ids or stale_fb: + await self.logger.debug(f'Cleaned up {len(stale_ids)} stream contexts, {len(stale_fb)} fallback texts') + + async def is_stream_output_supported(self) -> bool: + return self.config.get('enable-stream-reply', False) + + async def create_message_card(self, message_id: str, event: platform_events.MessageEvent) -> bool: + source = event.source_platform_object + # Streaming API only supports C2C private chat + if source.t != 'C2C_MESSAGE_CREATE': + return False + + ctx = { + 'user_openid': source.user_openid, + 'msg_id': source.d_id, + 'stream_msg_id': None, + 'msg_seq': 1, + 'index': 0, + 'last_update_ts': 0, + 'accumulated_text': '', + 'sent_length': 0, + 'session_started': False, + } + + self._stream_ctx[message_id] = ctx + self._stream_ctx_ts[message_id] = time.time() + return True + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message: dict, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ): + # Periodically clean up stale stream contexts + await self._cleanup_stale_streams() + # 提取纯文本内容(当前 chunk 的文本) + text_parts = [] + for msg in message: + if type(msg) is platform_message.Plain: + text_parts.append(msg.text) + chunk_text = '\n\n'.join(text_parts) + + message_id = ( + bot_message.get('resp_message_id') + if isinstance(bot_message, dict) + else getattr(bot_message, 'resp_message_id', None) + ) + if not message_id or message_id not in self._stream_ctx: + # 非流式场景(如群聊不支持流式),累积文本后一次性回复 + if chunk_text: + self._fallback_text[message_id] = self._fallback_text.get(message_id, '') + chunk_text + self._fallback_text_ts[message_id] = time.time() + if is_final: + full_text = self._fallback_text.pop(message_id, '') + if full_text: + fallback_msg = platform_message.MessageChain([platform_message.Plain(text=full_text)]) + await self.reply_message(message_source, fallback_msg, quote_origin) + return + + ctx = self._stream_ctx[message_id] + + # 累积文本 + if chunk_text: + ctx['accumulated_text'] += chunk_text + + # 未启动会话时,等第一个有内容的 chunk 来建立会话 + if not ctx['session_started']: + if not ctx['accumulated_text']: + return + # 用第一个 chunk 的文本建立会话(不发 "..." 避免污染前缀) + ctx['session_started'] = True + + # 发送内容 = 全量累积文本 + # QQ API 的 replace 模式不允许修改已下发前缀,所以: + # - 首次:发送全部文本,建立会话 + # - 后续:只能发送新增部分(append 行为) + content_to_send = ctx['accumulated_text'][ctx['sent_length'] :] + if not content_to_send and not is_final: + return + + input_state = 10 if is_final else 1 + + # Rate limiting: skip non-final updates if last update was <0.5s ago + now = time.time() + if not is_final and (now - ctx['last_update_ts']) < 0.5: + return + ctx['last_update_ts'] = now + + try: + resp = await self.bot.send_stream_msg( + user_openid=ctx['user_openid'], + content=content_to_send, + event_id=ctx['msg_id'], + msg_id=ctx['msg_id'], + msg_seq=ctx['msg_seq'], + index=ctx['index'], + stream_msg_id=ctx['stream_msg_id'], + input_state=input_state, + ) + if resp and isinstance(resp, dict): + new_stream_id = resp.get('id') + if new_stream_id: + ctx['stream_msg_id'] = new_stream_id + ctx['sent_length'] = len(ctx['accumulated_text']) + ctx['index'] += 1 + await self.logger.debug( + f'[QQ Official] 流式 chunk 已发送, index={ctx["index"]}, ' + f'sent_len={ctx["sent_length"]}, is_final={is_final}' + ) + except Exception as e: + await self.logger.error(f'Failed to send stream message: {e}') + + if is_final: + self._stream_ctx.pop(message_id, None) def unregister_listener( self, diff --git a/src/langbot/pkg/platform/sources/qqofficial.yaml b/src/langbot/pkg/platform/sources/qqofficial.yaml index 48ad7ffd..f6afdabc 100644 --- a/src/langbot/pkg/platform/sources/qqofficial.yaml +++ b/src/langbot/pkg/platform/sources/qqofficial.yaml @@ -7,9 +7,9 @@ metadata: zh_Hans: QQ 官方 API zh_Hant: QQ 官方 API description: - en_US: QQ Official API (Webhook) - zh_Hans: QQ 官方 API (Webhook),需要公网地址以接收消息推送,请查看文档了解使用方式 - zh_Hant: QQ 官方 API (Webhook),需要公網地址以接收訊息推送,請查看文件了解使用方式 + en_US: QQ Official API (Webhook / WebSocket) + zh_Hans: QQ 官方 API,支持 Webhook 和 WebSocket 两种连接模式 + zh_Hant: QQ 官方 API,支援 Webhook 和 WebSocket 兩種連線模式 icon: qqofficial.svg spec: categories: @@ -19,18 +19,6 @@ spec: en: https://link.langbot.app/en/platforms/qqofficial ja: https://link.langbot.app/ja/platforms/qqofficial config: - - name: webhook_url - label: - en_US: Webhook Callback URL - zh_Hans: Webhook 回调地址 - zh_Hant: Webhook 回調地址 - description: - en_US: Copy this URL and paste it into your QQ Official API webhook configuration - zh_Hans: 复制此地址并粘贴到 QQ 官方 API 的 Webhook 配置中 - zh_Hant: 複製此地址並貼到 QQ 官方 API 的 Webhook 設定中 - type: webhook-url - required: false - default: "" - name: appid label: en_US: App ID @@ -55,6 +43,46 @@ spec: type: string required: true default: "" + - name: enable-webhook + label: + en_US: Enable Webhook Mode + zh_Hans: 启用Webhook模式 + zh_Hant: 啟用 Webhook 模式 + description: + en_US: If enabled, the bot will use webhook mode to receive messages. Otherwise, it will use WebSocket mode + zh_Hans: 如果启用,机器人将使用 Webhook 模式接收消息。否则,将使用 WebSocket 模式 + zh_Hant: 如果啟用,機器人將使用 Webhook 模式接收訊息。否則,將使用 WebSocket 模式 + type: boolean + required: true + default: false + - name: enable-stream-reply + label: + en_US: Enable Stream Reply Mode + zh_Hans: 启用流式回复模式 + zh_Hant: 啟用串流回覆模式 + description: + en_US: If enabled, the bot will use streaming mode to reply messages (C2C only) + zh_Hans: 如果启用,机器人将使用流式方式回复消息(仅私聊) + zh_Hant: 如果啟用,機器人將使用串流方式回覆訊息(僅私聊) + type: boolean + required: true + default: false + - name: webhook_url + label: + en_US: Webhook Callback URL + zh_Hans: Webhook 回调地址 + zh_Hant: Webhook 回調地址 + description: + en_US: Copy this URL and paste it into your QQ Official API webhook configuration + zh_Hans: 复制此地址并粘贴到 QQ 官方 API 的 Webhook 配置中 + zh_Hant: 複製此地址並貼到 QQ 官方 API 的 Webhook 設定中 + type: webhook-url + required: false + default: "" + show_if: + field: enable-webhook + operator: eq + value: true execution: python: path: ./qqofficial.py