From d451b059fd4cf67df93d7444ecf40c1bdf49f4b1 Mon Sep 17 00:00:00 2001 From: fdc310 <82008029+fdc310@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:31:14 +0800 Subject: [PATCH] feat: Implement WebSocket long connection client for WeChat Work AI Bot (#2054) * feat: Implement WebSocket long connection client for WeChat Work AI Bot - Added WecomBotWsClient to handle WebSocket connections for receiving messages and sending replies. - Introduced a new migration (dbm022) to add 'enable-webhook' field to existing wecombot adapter configs, ensuring backward compatibility. - Updated WecomBotAdapter to support both WebSocket and webhook modes based on the new configuration. - Enhanced YAML configuration for WecomBot to include 'enable-webhook' and 'Secret' fields, adjusting requirements accordingly. - Incremented database version to 22 to reflect schema changes. * fix:db enable-webhook is false * fix:add logic * fix:Removed an unnecessary configuration check * fix: migration * fix: update migration * fix:migration --- src/langbot/libs/wecom_ai_bot_api/api.py | 472 +++++++------- .../libs/wecom_ai_bot_api/ws_client.py | 596 ++++++++++++++++++ .../dbm024_wecombot_websocket_mode.py | 49 ++ src/langbot/pkg/platform/sources/wecombot.py | 122 ++-- .../pkg/platform/sources/wecombot.yaml | 49 +- src/langbot/pkg/utils/constants.py | 2 +- 6 files changed, 1003 insertions(+), 287 deletions(-) create mode 100644 src/langbot/libs/wecom_ai_bot_api/ws_client.py create mode 100644 src/langbot/pkg/persistence/migrations/dbm024_wecombot_websocket_mode.py diff --git a/src/langbot/libs/wecom_ai_bot_api/api.py b/src/langbot/libs/wecom_ai_bot_api/api.py index c5f5d84d..3e2ca0f5 100644 --- a/src/langbot/libs/wecom_ai_bot_api/api.py +++ b/src/langbot/libs/wecom_ai_bot_api/api.py @@ -199,6 +199,253 @@ class StreamSessionManager: self._msg_index.pop(msg_id, None) +async def download_encrypted_file(download_url: str, encoding_aes_key: str, logger: EventLogger) -> Optional[str]: + """Download an AES-encrypted file from WeChat Work and return as data URI. + + Args: + download_url: The encrypted file download URL. + encoding_aes_key: The AES key used for decryption (base64-encoded, without trailing '='). + logger: Logger instance. + + Returns: + A data URI string (e.g. 'data:image/jpeg;base64,...') or None on failure. + """ + if not download_url: + return None + async with httpx.AsyncClient() as client: + response = await client.get(download_url) + if response.status_code != 200: + await logger.error(f'failed to get file: {response.text}') + return None + encrypted_bytes = response.content + + aes_key = base64.b64decode(encoding_aes_key + '=') + iv = aes_key[:16] + + cipher = AES.new(aes_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted_bytes) + + pad_len = decrypted[-1] + decrypted = decrypted[:-pad_len] + + if decrypted.startswith(b'\xff\xd8'): + mime_type = 'image/jpeg' + elif decrypted.startswith(b'\x89PNG'): + mime_type = 'image/png' + elif decrypted.startswith((b'GIF87a', b'GIF89a')): + mime_type = 'image/gif' + elif decrypted.startswith(b'BM'): + mime_type = 'image/bmp' + elif decrypted.startswith(b'II*\x00') or decrypted.startswith(b'MM\x00*'): + mime_type = 'image/tiff' + else: + mime_type = 'application/octet-stream' + + base64_str = base64.b64encode(decrypted).decode('utf-8') + return f'data:{mime_type};base64,{base64_str}' + + +async def parse_wecom_bot_message( + msg_json: dict[str, Any], encoding_aes_key: str, logger: EventLogger +) -> dict[str, Any]: + """Parse a decrypted WeChat Work AI Bot message JSON into a unified message dict. + + This is the shared message parsing logic used by both webhook and WebSocket modes. + + Args: + msg_json: The decrypted message JSON from WeChat Work. + encoding_aes_key: AES key for file decryption. + logger: Logger instance. + + Returns: + A dict suitable for constructing a WecomBotEvent. + """ + message_data: dict[str, Any] = {} + + msg_type = msg_json.get('msgtype', '') + if msg_type: + message_data['msgtype'] = msg_type + + if msg_json.get('chattype', '') == 'single': + message_data['type'] = 'single' + elif msg_json.get('chattype', '') == 'group': + message_data['type'] = 'group' + + max_inline_file_size = 5 * 1024 * 1024 + + async def _safe_download(url: str): + if not url: + return None + return await download_encrypted_file(url, encoding_aes_key, logger) + + if msg_type == 'text': + message_data['content'] = msg_json.get('text', {}).get('content') + elif msg_type == 'markdown': + message_data['content'] = msg_json.get('markdown', {}).get('content') or msg_json.get('text', {}).get( + 'content', '' + ) + elif msg_type == 'image': + picurl = msg_json.get('image', {}).get('url', '') + base64_data = await _safe_download(picurl) + if base64_data: + message_data['picurl'] = base64_data + message_data['images'] = [base64_data] + elif msg_type == 'voice': + voice_info = msg_json.get('voice', {}) or {} + download_url = voice_info.get('url') + message_data['voice'] = { + 'url': download_url, + 'md5sum': voice_info.get('md5sum') or voice_info.get('md5'), + 'filesize': voice_info.get('filesize') or voice_info.get('size'), + 'sdkfileid': voice_info.get('sdkfileid') or voice_info.get('fileid'), + } + if voice_info.get('content'): + message_data['content'] = voice_info.get('content') + if (message_data['voice'].get('filesize') or 0) <= max_inline_file_size: + voice_base64 = await _safe_download(download_url) + if voice_base64: + message_data['voice']['base64'] = voice_base64 + elif msg_type == 'video': + video_info = msg_json.get('video', {}) or {} + download_url = video_info.get('url') + video_data = { + 'url': download_url, + 'filesize': video_info.get('filesize') or video_info.get('size'), + 'sdkfileid': video_info.get('sdkfileid') or video_info.get('fileid'), + 'md5sum': video_info.get('md5sum') or video_info.get('md5'), + 'filename': video_info.get('filename') or video_info.get('name'), + } + if (video_data.get('filesize') or 0) <= max_inline_file_size: + video_base64 = await _safe_download(download_url) + if video_base64: + video_data['base64'] = video_base64 + message_data['video'] = video_data + elif msg_type == 'file': + file_info = msg_json.get('file', {}) or {} + download_url = file_info.get('url') or file_info.get('fileurl') + file_data = { + 'filename': file_info.get('filename') or file_info.get('name'), + 'filesize': file_info.get('filesize') or file_info.get('size'), + 'md5sum': file_info.get('md5sum') or file_info.get('md5'), + 'sdkfileid': file_info.get('sdkfileid') or file_info.get('fileid'), + 'download_url': download_url, + 'extra': file_info, + } + if (file_data.get('filesize') or 0) <= max_inline_file_size: + file_base64 = await _safe_download(download_url) + if file_base64: + file_data['base64'] = file_base64 + message_data['file'] = file_data + elif msg_type == 'link': + message_data['link'] = msg_json.get('link', {}) + if not message_data.get('content'): + title = message_data['link'].get('title', '') + desc = message_data['link'].get('description') or message_data['link'].get('digest', '') + message_data['content'] = '\n'.join(filter(None, [title, desc])) + elif msg_type == 'mixed': + items = msg_json.get('mixed', {}).get('msg_item', []) + texts = [] + images = [] + files = [] + voices = [] + videos = [] + links = [] + for item in items: + item_type = item.get('msgtype') + if item_type == 'text': + texts.append(item.get('text', {}).get('content', '')) + elif item_type == 'image': + img_url = item.get('image', {}).get('url') + base64_data = await _safe_download(img_url) + if base64_data: + images.append(base64_data) + elif item_type == 'file': + file_info = item.get('file', {}) or {} + download_url = file_info.get('url') or file_info.get('fileurl') + file_data = { + 'filename': file_info.get('filename') or file_info.get('name'), + 'filesize': file_info.get('filesize') or file_info.get('size'), + 'md5sum': file_info.get('md5sum') or file_info.get('md5'), + 'sdkfileid': file_info.get('sdkfileid') or file_info.get('fileid'), + 'download_url': download_url, + 'extra': file_info, + } + if (file_data.get('filesize') or 0) <= max_inline_file_size: + file_base64 = await _safe_download(download_url) + if file_base64: + file_data['base64'] = file_base64 + files.append(file_data) + elif item_type == 'voice': + voice_info = item.get('voice', {}) or {} + download_url = voice_info.get('url') + voice_data = { + 'url': download_url, + 'md5sum': voice_info.get('md5sum') or voice_info.get('md5'), + 'filesize': voice_info.get('filesize') or voice_info.get('size'), + 'sdkfileid': voice_info.get('sdkfileid') or voice_info.get('fileid'), + } + if voice_info.get('content'): + texts.append(voice_info.get('content')) + if (voice_data.get('filesize') or 0) <= max_inline_file_size: + voice_base64 = await _safe_download(download_url) + if voice_base64: + voice_data['base64'] = voice_base64 + voices.append(voice_data) + elif item_type == 'video': + video_info = item.get('video', {}) or {} + download_url = video_info.get('url') + video_data = { + 'url': download_url, + 'filesize': video_info.get('filesize') or video_info.get('size'), + 'sdkfileid': video_info.get('sdkfileid') or video_info.get('fileid'), + 'md5sum': video_info.get('md5sum') or video_info.get('md5'), + 'filename': video_info.get('filename') or video_info.get('name'), + } + if (video_data.get('filesize') or 0) <= max_inline_file_size: + video_base64 = await _safe_download(download_url) + if video_base64: + video_data['base64'] = video_base64 + videos.append(video_data) + elif item_type == 'link': + links.append(item.get('link', {})) + + if texts: + message_data['content'] = ' '.join(texts) + if images: + message_data['images'] = images + message_data['picurl'] = images[0] + if files: + message_data['files'] = files + message_data['file'] = files[0] + if voices: + message_data['voices'] = voices + message_data['voice'] = voices[0] + if videos: + message_data['videos'] = videos + message_data['video'] = videos[0] + if links: + message_data['link'] = links[0] + if items: + message_data['attachments'] = items + else: + message_data['raw_msg'] = msg_json + + from_info = msg_json.get('from', {}) + message_data['userid'] = from_info.get('userid', '') + message_data['username'] = from_info.get('alias', '') or from_info.get('name', '') or from_info.get('userid', '') + + if msg_json.get('chattype', '') == 'group': + message_data['chatid'] = msg_json.get('chatid', '') + message_data['chatname'] = msg_json.get('chatname', '') or msg_json.get('chatid', '') + + message_data['msgid'] = msg_json.get('msgid', '') + + if msg_json.get('aibotid'): + message_data['aibotid'] = msg_json.get('aibotid', '') + + return message_data + + class WecomBotClient: def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False): """企业微信智能机器人客户端。 @@ -455,196 +702,7 @@ class WecomBotClient: return await self._handle_post_initial_response(msg_json, nonce) async def get_message(self, msg_json): - message_data = {} - - msg_type = msg_json.get('msgtype', '') - if msg_type: - message_data['msgtype'] = msg_type - - if msg_json.get('chattype', '') == 'single': - message_data['type'] = 'single' - elif msg_json.get('chattype', '') == 'group': - message_data['type'] = 'group' - - max_inline_file_size = 5 * 1024 * 1024 # avoid decoding very large payloads by default - - async def _safe_download(url: str): - if not url: - return None - return await self.download_url_to_base64(url, self.EnCodingAESKey) - - if msg_type == 'text': - message_data['content'] = msg_json.get('text', {}).get('content') - elif msg_type == 'markdown': - message_data['content'] = msg_json.get('markdown', {}).get('content') or msg_json.get('text', {}).get( - 'content', '' - ) - elif msg_type == 'image': - picurl = msg_json.get('image', {}).get('url', '') - base64_data = await _safe_download(picurl) - if base64_data: - message_data['picurl'] = base64_data - message_data['images'] = [base64_data] - elif msg_type == 'voice': - voice_info = msg_json.get('voice', {}) or {} - download_url = voice_info.get('url') - message_data['voice'] = { - 'url': download_url, - 'md5sum': voice_info.get('md5sum') or voice_info.get('md5'), - 'filesize': voice_info.get('filesize') or voice_info.get('size'), - 'sdkfileid': voice_info.get('sdkfileid') or voice_info.get('fileid'), - } - # 企业微信智能转写文本(如果已有)直接复用,避免重复转写 - if voice_info.get('content'): - message_data['content'] = voice_info.get('content') - if (message_data['voice'].get('filesize') or 0) <= max_inline_file_size: - voice_base64 = await _safe_download(download_url) - if voice_base64: - message_data['voice']['base64'] = voice_base64 - elif msg_type == 'video': - video_info = msg_json.get('video', {}) or {} - download_url = video_info.get('url') - video_data = { - 'url': download_url, - 'filesize': video_info.get('filesize') or video_info.get('size'), - 'sdkfileid': video_info.get('sdkfileid') or video_info.get('fileid'), - 'md5sum': video_info.get('md5sum') or video_info.get('md5'), - 'filename': video_info.get('filename') or video_info.get('name'), - } - if (video_data.get('filesize') or 0) <= max_inline_file_size: - video_base64 = await _safe_download(download_url) - if video_base64: - video_data['base64'] = video_base64 - message_data['video'] = video_data - elif msg_type == 'file': - file_info = msg_json.get('file', {}) or {} - download_url = file_info.get('url') or file_info.get('fileurl') - file_data = { - 'filename': file_info.get('filename') or file_info.get('name'), - 'filesize': file_info.get('filesize') or file_info.get('size'), - 'md5sum': file_info.get('md5sum') or file_info.get('md5'), - 'sdkfileid': file_info.get('sdkfileid') or file_info.get('fileid'), - 'download_url': download_url, - 'extra': file_info, - } - if (file_data.get('filesize') or 0) <= max_inline_file_size: - file_base64 = await _safe_download(download_url) - if file_base64: - file_data['base64'] = file_base64 - message_data['file'] = file_data - elif msg_type == 'link': - message_data['link'] = msg_json.get('link', {}) - if not message_data.get('content'): - title = message_data['link'].get('title', '') - desc = message_data['link'].get('description') or message_data['link'].get('digest', '') - message_data['content'] = '\n'.join(filter(None, [title, desc])) - elif msg_type == 'mixed': - items = msg_json.get('mixed', {}).get('msg_item', []) - texts = [] - images = [] - files = [] - voices = [] - videos = [] - links = [] - for item in items: - item_type = item.get('msgtype') - if item_type == 'text': - texts.append(item.get('text', {}).get('content', '')) - elif item_type == 'image': - img_url = item.get('image', {}).get('url') - base64_data = await _safe_download(img_url) - if base64_data: - images.append(base64_data) - elif item_type == 'file': - file_info = item.get('file', {}) or {} - download_url = file_info.get('url') or file_info.get('fileurl') - file_data = { - 'filename': file_info.get('filename') or file_info.get('name'), - 'filesize': file_info.get('filesize') or file_info.get('size'), - 'md5sum': file_info.get('md5sum') or file_info.get('md5'), - 'sdkfileid': file_info.get('sdkfileid') or file_info.get('fileid'), - 'download_url': download_url, - 'extra': file_info, - } - if (file_data.get('filesize') or 0) <= max_inline_file_size: - file_base64 = await _safe_download(download_url) - if file_base64: - file_data['base64'] = file_base64 - files.append(file_data) - elif item_type == 'voice': - voice_info = item.get('voice', {}) or {} - download_url = voice_info.get('url') - voice_data = { - 'url': download_url, - 'md5sum': voice_info.get('md5sum') or voice_info.get('md5'), - 'filesize': voice_info.get('filesize') or voice_info.get('size'), - 'sdkfileid': voice_info.get('sdkfileid') or voice_info.get('fileid'), - } - if voice_info.get('content'): - texts.append(voice_info.get('content')) - if (voice_data.get('filesize') or 0) <= max_inline_file_size: - voice_base64 = await _safe_download(download_url) - if voice_base64: - voice_data['base64'] = voice_base64 - voices.append(voice_data) - elif item_type == 'video': - video_info = item.get('video', {}) or {} - download_url = video_info.get('url') - video_data = { - 'url': download_url, - 'filesize': video_info.get('filesize') or video_info.get('size'), - 'sdkfileid': video_info.get('sdkfileid') or video_info.get('fileid'), - 'md5sum': video_info.get('md5sum') or video_info.get('md5'), - 'filename': video_info.get('filename') or video_info.get('name'), - } - if (video_data.get('filesize') or 0) <= max_inline_file_size: - video_base64 = await _safe_download(download_url) - if video_base64: - video_data['base64'] = video_base64 - videos.append(video_data) - elif item_type == 'link': - links.append(item.get('link', {})) - - if texts: - message_data['content'] = ' '.join(texts) # 拼接所有 text - if images: - message_data['images'] = images - message_data['picurl'] = images[0] # 只保留第一个 image - if files: - message_data['files'] = files - message_data['file'] = files[0] - if voices: - message_data['voices'] = voices - message_data['voice'] = voices[0] - if videos: - message_data['videos'] = videos - message_data['video'] = videos[0] - if links: - message_data['link'] = links[0] - if items: - message_data['attachments'] = items - else: - message_data['raw_msg'] = msg_json - - # Extract user information - from_info = msg_json.get('from', {}) - message_data['userid'] = from_info.get('userid', '') - message_data['username'] = ( - from_info.get('alias', '') or from_info.get('name', '') or from_info.get('userid', '') - ) - - # Extract chat/group information - if msg_json.get('chattype', '') == 'group': - message_data['chatid'] = msg_json.get('chatid', '') - # Try to get group name if available - message_data['chatname'] = msg_json.get('chatname', '') or msg_json.get('chatid', '') - - message_data['msgid'] = msg_json.get('msgid', '') - - if msg_json.get('aibotid'): - message_data['aibotid'] = msg_json.get('aibotid', '') - - return message_data + return await parse_wecom_bot_message(msg_json, self.EnCodingAESKey, self.logger) async def _handle_message(self, event: wecombotevent.WecomBotEvent): """ @@ -712,39 +770,7 @@ class WecomBotClient: return decorator async def download_url_to_base64(self, download_url, encoding_aes_key): - async with httpx.AsyncClient() as client: - response = await client.get(download_url) - if response.status_code != 200: - await self.logger.error(f'failed to get file: {response.text}') - return None - - encrypted_bytes = response.content - - aes_key = base64.b64decode(encoding_aes_key + '=') # base64 补齐 - iv = aes_key[:16] - - cipher = AES.new(aes_key, AES.MODE_CBC, iv) - decrypted = cipher.decrypt(encrypted_bytes) - - pad_len = decrypted[-1] - decrypted = decrypted[:-pad_len] - - if decrypted.startswith(b'\xff\xd8'): # JPEG - mime_type = 'image/jpeg' - elif decrypted.startswith(b'\x89PNG'): # PNG - mime_type = 'image/png' - elif decrypted.startswith((b'GIF87a', b'GIF89a')): # GIF - mime_type = 'image/gif' - elif decrypted.startswith(b'BM'): # BMP - mime_type = 'image/bmp' - elif decrypted.startswith(b'II*\x00') or decrypted.startswith(b'MM\x00*'): # TIFF - mime_type = 'image/tiff' - else: - mime_type = 'application/octet-stream' - - # 转 base64 - base64_str = base64.b64encode(decrypted).decode('utf-8') - return f'data:{mime_type};base64,{base64_str}' + return await download_encrypted_file(download_url, encoding_aes_key, self.logger) async def run_task(self, host: str, port: int, *args, **kwargs): """ diff --git a/src/langbot/libs/wecom_ai_bot_api/ws_client.py b/src/langbot/libs/wecom_ai_bot_api/ws_client.py new file mode 100644 index 00000000..332b4eb9 --- /dev/null +++ b/src/langbot/libs/wecom_ai_bot_api/ws_client.py @@ -0,0 +1,596 @@ +"""WeChat Work AI Bot WebSocket long connection client. + +Implements the WebSocket protocol for receiving messages and sending replies +via a persistent connection to wss://openws.work.weixin.qq.com, as an +alternative to the HTTP callback (webhook) mode. + +Protocol reference: https://developer.work.weixin.qq.com/document/path/101463 +Official Node.js SDK: https://github.com/WecomTeam/aibot-node-sdk +""" + +from __future__ import annotations + +import asyncio +import json +import secrets +import time +import traceback +from typing import Any, Callable, Optional + +import aiohttp + +from langbot.libs.wecom_ai_bot_api import wecombotevent +from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message +from langbot.pkg.platform.logger import EventLogger + +DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com' + +# WebSocket frame command constants +CMD_SUBSCRIBE = 'aibot_subscribe' +CMD_HEARTBEAT = 'ping' +CMD_MSG_CALLBACK = 'aibot_msg_callback' +CMD_EVENT_CALLBACK = 'aibot_event_callback' +CMD_RESPOND_MSG = 'aibot_respond_msg' +CMD_RESPOND_WELCOME = 'aibot_respond_welcome_msg' +CMD_RESPOND_UPDATE = 'aibot_respond_update_msg' +CMD_SEND_MSG = 'aibot_send_msg' + + +def _generate_req_id(prefix: str) -> str: + """Generate a unique request ID in the format: {prefix}_{timestamp}_{random}.""" + ts = int(time.time() * 1000) + rand = secrets.token_hex(4) + return f'{prefix}_{ts}_{rand}' + + +class WecomBotWsClient: + """WeChat Work AI Bot WebSocket long connection client. + + Provides message receiving, streaming reply, proactive message sending, + and event callback handling over a persistent WebSocket connection. + """ + + def __init__( + self, + bot_id: str, + secret: str, + logger: EventLogger, + encoding_aes_key: str = '', + ws_url: str = DEFAULT_WS_URL, + heartbeat_interval: float = 30.0, + max_reconnect_attempts: int = -1, + reconnect_base_delay: float = 1.0, + reconnect_max_delay: float = 30.0, + ): + self.bot_id = bot_id + self.secret = secret + self.logger = logger + self.encoding_aes_key = encoding_aes_key + self.ws_url = ws_url + self.heartbeat_interval = heartbeat_interval + self.max_reconnect_attempts = max_reconnect_attempts + self.reconnect_base_delay = reconnect_base_delay + self.reconnect_max_delay = reconnect_max_delay + + self._ws: Optional[aiohttp.ClientWebSocketResponse] = None + self._session: Optional[aiohttp.ClientSession] = None + self._running = False + self._heartbeat_task: Optional[asyncio.Task] = None + self._missed_pong_count = 0 + self._max_missed_pong = 2 + self._reconnect_attempts = 0 + + # Message handler registry (same pattern as WecomBotClient) + self._message_handlers: dict[str, list[Callable]] = {} + # Message deduplication + self._msg_id_map: dict[str, int] = {} + + # Pending ACK futures: req_id -> Future[dict] + self._pending_acks: dict[str, asyncio.Future] = {} + # Per-req_id serial reply queues + self._reply_queues: dict[str, asyncio.Queue] = {} + self._reply_workers: dict[str, asyncio.Task] = {} + self._reply_ack_timeout = 5.0 + + # Stream ID tracking for WebSocket mode + self._stream_ids: dict[str, str] = {} # msg_id -> req_id|stream_id + # Dedup: skip sending when content hasn't changed + self._stream_last_content: dict[str, str] = {} # msg_id -> last content sent + + # ── Public API ────────────────────────────────────────────────── + + async def connect(self): + """Connect to WebSocket server with automatic reconnection. + + This method blocks until disconnect() is called or max reconnect + attempts are exhausted. + """ + self._running = True + self._reconnect_attempts = 0 + + while self._running: + try: + await self._connect_once() + except Exception: + if not self._running: + break + await self.logger.error(f'WebSocket connection error: {traceback.format_exc()}') + + if not self._running: + break + + # Reconnect with exponential backoff + if self.max_reconnect_attempts != -1 and self._reconnect_attempts >= self.max_reconnect_attempts: + await self.logger.error(f'Max reconnect attempts reached ({self.max_reconnect_attempts}), giving up') + break + + self._reconnect_attempts += 1 + delay = min( + self.reconnect_base_delay * (2 ** (self._reconnect_attempts - 1)), + self.reconnect_max_delay, + ) + await self.logger.info(f'Reconnecting in {delay:.1f}s (attempt {self._reconnect_attempts})...') + await asyncio.sleep(delay) + + async def disconnect(self): + """Gracefully disconnect from the WebSocket server.""" + self._running = False + if self._heartbeat_task and not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + for task in self._reply_workers.values(): + if not task.done(): + task.cancel() + if self._ws and not self._ws.closed: + await self._ws.close() + self._ws = None + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + def on_message(self, msg_type: str) -> Callable: + """Decorator to register a message handler. + + Same interface as WecomBotClient.on_message for compatibility. + + Args: + msg_type: 'single', 'group', or specific message type. + """ + + def decorator(func: Callable[[wecombotevent.WecomBotEvent], Any]): + if msg_type not in self._message_handlers: + self._message_handlers[msg_type] = [] + self._message_handlers[msg_type].append(func) + return func + + return decorator + + async def reply_stream( + self, + req_id: str, + stream_id: str, + content: str, + finish: bool = False, + ) -> Optional[dict]: + """Send a streaming reply frame. + + Args: + req_id: The req_id from the original message frame (must be passed through). + stream_id: The stream ID for this streaming session. + content: The content to send (supports Markdown). + finish: Whether this is the final chunk. + + Returns: + The ACK frame dict, or None on failure. + """ + body = { + 'msgtype': 'stream', + 'stream': { + 'id': stream_id, + 'finish': finish, + 'content': content, + }, + } + return await self._send_reply(req_id, body) + + async def reply_text(self, req_id: str, content: str) -> Optional[dict]: + """Send a non-streaming text reply. + + Args: + req_id: The req_id from the original message frame. + content: The text content to reply. + + Returns: + The ACK frame dict, or None on failure. + """ + body = { + 'msgtype': 'markdown', + 'markdown': { + 'content': content, + }, + } + return await self._send_reply(req_id, body) + + async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]: + """Proactively send a message to a specified chat. + + Args: + chat_id: The chat ID (userid for single chat, chatid for group chat). + content: The message content. + msgtype: Message type, 'markdown' by default. + + Returns: + The ACK frame dict, or None on failure. + """ + req_id = _generate_req_id(CMD_SEND_MSG) + body: dict[str, Any] = { + 'chatid': chat_id, + 'msgtype': msgtype, + } + if msgtype == 'markdown': + body['markdown'] = {'content': content} + elif msgtype == 'text': + body['text'] = {'content': content} + return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG) + + async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool: + """Push a streaming chunk for a given message ID. + + Compatible interface with WecomBotClient.push_stream_chunk. + + Args: + msg_id: The original message ID. + content: The cumulative content from the pipeline. + is_final: Whether this is the final chunk. + + Returns: + True if the stream session exists and chunk was sent. + """ + key = self._stream_ids.get(msg_id) + if not key: + return False + req_id, stream_id = key.split('|', 1) + try: + # Skip sending if content hasn't changed (e.g. during tool call argument streaming) + if not is_final and content == self._stream_last_content.get(msg_id): + return True + await self.reply_stream(req_id, stream_id, content, finish=is_final) + self._stream_last_content[msg_id] = content + if is_final: + self._stream_ids.pop(msg_id, None) + self._stream_last_content.pop(msg_id, None) + return True + except Exception: + await self.logger.error(f'Failed to push stream chunk: {traceback.format_exc()}') + return False + + async def set_message(self, msg_id: str, content: str): + """Fallback: send content as a final stream chunk or direct reply. + + Compatible interface with WecomBotClient.set_message. + """ + handled = await self.push_stream_chunk(msg_id, content, is_final=True) + if not handled: + await self.logger.warning(f'No active stream for msg_id={msg_id}, message dropped') + + # ── Connection lifecycle ──────────────────────────────────────── + + async def _connect_once(self): + """Establish a single WebSocket connection, authenticate, and listen.""" + await self.logger.info(f'Connecting to {self.ws_url}...') + + self._session = aiohttp.ClientSession() + try: + self._ws = await self._session.ws_connect(self.ws_url) + self._missed_pong_count = 0 + self._reconnect_attempts = 0 + await self.logger.info('WebSocket connected, sending auth...') + + await self._send_auth() + + # Wait for auth response + auth_ok = await self._wait_for_auth() + if not auth_ok: + await self.logger.error('Authentication failed') + return + + await self.logger.info('Authenticated successfully') + + # Start heartbeat + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + + try: + await self._listen_loop() + finally: + if self._heartbeat_task and not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + self._clear_pending_acks('Connection closed') + finally: + if self._ws and not self._ws.closed: + await self._ws.close() + self._ws = None + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def _send_auth(self): + """Send the authentication frame.""" + frame = { + 'cmd': CMD_SUBSCRIBE, + 'headers': {'req_id': _generate_req_id(CMD_SUBSCRIBE)}, + 'body': { + 'bot_id': self.bot_id, + 'secret': self.secret, + }, + } + await self._send_frame(frame) + + async def _wait_for_auth(self) -> bool: + """Wait for and validate the authentication response.""" + try: + msg = await asyncio.wait_for(self._ws.receive(), timeout=10.0) + if msg.type in (aiohttp.WSMsgType.TEXT,): + frame = json.loads(msg.data) + req_id = frame.get('headers', {}).get('req_id', '') + if req_id.startswith(CMD_SUBSCRIBE) and frame.get('errcode') == 0: + return True + await self.logger.error(f'Auth response: errcode={frame.get("errcode")}, errmsg={frame.get("errmsg")}') + return False + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING): + await self.logger.error(f'WebSocket closed during auth: {msg.type}') + return False + await self.logger.error(f'Unexpected message type during auth: {msg.type}') + return False + except asyncio.TimeoutError: + await self.logger.error('Auth response timeout') + return False + + async def _heartbeat_loop(self): + """Periodically send heartbeat pings.""" + try: + while self._running and self._ws and not self._ws.closed: + await asyncio.sleep(self.heartbeat_interval) + if not self._running or not self._ws or self._ws.closed: + break + + if self._missed_pong_count >= self._max_missed_pong: + await self.logger.warning( + f'No heartbeat ack for {self._missed_pong_count} consecutive pings, connection considered dead' + ) + await self._ws.close() + break + + self._missed_pong_count += 1 + frame = { + 'cmd': CMD_HEARTBEAT, + 'headers': {'req_id': _generate_req_id(CMD_HEARTBEAT)}, + } + try: + await self._send_frame(frame) + except Exception: + break + except asyncio.CancelledError: + pass + + async def _listen_loop(self): + """Listen for incoming WebSocket frames and dispatch them.""" + async for msg in self._ws: + if not self._running: + break + if msg.type == aiohttp.WSMsgType.TEXT: + try: + frame = json.loads(msg.data) + await self._handle_frame(frame) + except json.JSONDecodeError: + await self.logger.error(f'Failed to parse WebSocket message: {str(msg.data)[:200]}') + except Exception: + await self.logger.error(f'Error handling frame: {traceback.format_exc()}') + elif msg.type == aiohttp.WSMsgType.BINARY: + try: + frame = json.loads(msg.data) + await self._handle_frame(frame) + except Exception: + await self.logger.error(f'Error handling binary frame: {traceback.format_exc()}') + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING): + await self.logger.warning(f'WebSocket connection closed: {msg.type}') + break + + # ── Frame handling ────────────────────────────────────────────── + + async def _handle_frame(self, frame: dict): + """Route an incoming frame to the appropriate handler.""" + cmd = frame.get('cmd', '') + + # Message push + if cmd == CMD_MSG_CALLBACK: + asyncio.create_task(self._handle_message_callback(frame)) + return + + # Event push + if cmd == CMD_EVENT_CALLBACK: + asyncio.create_task(self._handle_event_callback(frame)) + return + + # No cmd → response/ACK frame, dispatch by req_id prefix + req_id = frame.get('headers', {}).get('req_id', '') + + # Check pending ACKs first + if req_id in self._pending_acks: + future = self._pending_acks.pop(req_id) + if not future.done(): + future.set_result(frame) + return + + # Heartbeat response + if req_id.startswith(CMD_HEARTBEAT): + if frame.get('errcode') == 0: + self._missed_pong_count = 0 + return + + # Unknown frame + await self.logger.warning(f'Unknown frame: {json.dumps(frame, ensure_ascii=False)[:200]}') + + async def _handle_message_callback(self, frame: dict): + """Handle an incoming message callback frame.""" + try: + body = frame.get('body', {}) + req_id = frame.get('headers', {}).get('req_id', '') + + # Parse message using shared logic + message_data = await parse_wecom_bot_message(body, self.encoding_aes_key, self.logger) + if not message_data: + return + + # Generate stream_id for this message and store the mapping + stream_id = _generate_req_id('stream') + msg_id = message_data.get('msgid', '') + if msg_id: + self._stream_ids[msg_id] = f'{req_id}|{stream_id}' + message_data['stream_id'] = stream_id + message_data['req_id'] = req_id + + event = wecombotevent.WecomBotEvent(message_data) + await self._dispatch_event(event) + except Exception: + await self.logger.error(f'Error in message callback: {traceback.format_exc()}') + + async def _handle_event_callback(self, frame: dict): + """Handle an incoming event callback frame (enter_chat, template_card_event, etc.).""" + try: + body = frame.get('body', {}) + req_id = frame.get('headers', {}).get('req_id', '') + + event_info = body.get('event', {}) + event_type = event_info.get('eventtype', '') + + message_data = { + 'msgtype': 'event', + 'type': body.get('chattype', 'single'), + 'event': event_info, + 'eventtype': event_type, + 'msgid': body.get('msgid', ''), + 'aibotid': body.get('aibotid', ''), + 'req_id': req_id, + } + + from_info = body.get('from', {}) + message_data['userid'] = from_info.get('userid', '') + message_data['username'] = from_info.get('alias', '') or from_info.get('userid', '') + + if body.get('chatid'): + message_data['chatid'] = body.get('chatid', '') + + event = wecombotevent.WecomBotEvent(message_data) + + # Dispatch to event-specific handlers + if event_type in self._message_handlers: + for handler in self._message_handlers[event_type]: + await handler(event) + + # Also dispatch to generic 'event' handlers + if 'event' in self._message_handlers: + for handler in self._message_handlers['event']: + await handler(event) + + except Exception: + await self.logger.error(f'Error in event callback: {traceback.format_exc()}') + + async def _dispatch_event(self, event: wecombotevent.WecomBotEvent): + """Dispatch a message event to registered handlers with deduplication.""" + try: + message_id = event.message_id + if message_id in self._msg_id_map: + self._msg_id_map[message_id] += 1 + return + self._msg_id_map[message_id] = 1 + + msg_type = event.type + if msg_type in self._message_handlers: + for handler in self._message_handlers[msg_type]: + await handler(event) + except Exception: + await self.logger.error(f'Error dispatching event: {traceback.format_exc()}') + + # ── Reply sending with serial queue ───────────────────────────── + + async def _send_reply( + self, + req_id: str, + body: dict, + cmd: str = CMD_RESPOND_MSG, + ) -> Optional[dict]: + """Send a reply frame and wait for ACK. + + Replies with the same req_id are serialized to maintain ordering. + """ + if not self._ws or self._ws.closed: + return None + + frame = { + 'cmd': cmd, + 'headers': {'req_id': req_id}, + 'body': body, + } + + # Ensure serial delivery per req_id + if req_id not in self._reply_queues: + self._reply_queues[req_id] = asyncio.Queue() + self._reply_workers[req_id] = asyncio.create_task(self._reply_queue_worker(req_id)) + + future: asyncio.Future = asyncio.get_event_loop().create_future() + await self._reply_queues[req_id].put((frame, future)) + return await future + + async def _reply_queue_worker(self, req_id: str): + """Process reply queue items serially for a given req_id.""" + queue = self._reply_queues[req_id] + try: + while self._running: + try: + frame, future = await asyncio.wait_for(queue.get(), timeout=60.0) + except asyncio.TimeoutError: + # Queue idle, clean up worker + break + + try: + ack = await self._send_and_wait_ack(frame) + if not future.done(): + future.set_result(ack) + except Exception as e: + if not future.done(): + future.set_exception(e) + except asyncio.CancelledError: + pass + finally: + self._reply_queues.pop(req_id, None) + self._reply_workers.pop(req_id, None) + + async def _send_and_wait_ack(self, frame: dict) -> Optional[dict]: + """Send a frame and wait for the corresponding ACK.""" + req_id = frame['headers']['req_id'] + ack_future: asyncio.Future = asyncio.get_event_loop().create_future() + self._pending_acks[req_id] = ack_future + + try: + await self._send_frame(frame) + result = await asyncio.wait_for(ack_future, timeout=self._reply_ack_timeout) + if result.get('errcode', 0) != 0: + await self.logger.warning( + f'Reply ACK error: errcode={result.get("errcode")}, errmsg={result.get("errmsg")}' + ) + return result + except asyncio.TimeoutError: + self._pending_acks.pop(req_id, None) + await self.logger.warning(f'Reply ACK timeout ({self._reply_ack_timeout}s) for req_id={req_id}') + return None + + async def _send_frame(self, frame: dict): + """Send a JSON frame over the WebSocket connection.""" + if self._ws and not self._ws.closed: + await self._ws.send_str(json.dumps(frame, ensure_ascii=False)) + + def _clear_pending_acks(self, reason: str): + """Reject all pending ACK futures on disconnection.""" + for req_id, future in self._pending_acks.items(): + if not future.done(): + future.set_exception(ConnectionError(reason)) + self._pending_acks.clear() diff --git a/src/langbot/pkg/persistence/migrations/dbm024_wecombot_websocket_mode.py b/src/langbot/pkg/persistence/migrations/dbm024_wecombot_websocket_mode.py new file mode 100644 index 00000000..a5b833bc --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm024_wecombot_websocket_mode.py @@ -0,0 +1,49 @@ +from .. import migration + +import sqlalchemy +import json + + +@migration.migration_class(24) +class DBMigrateWecomBotWebSocketMode(migration.DBMigration): + """Add enable-webhook field to existing wecombot adapter configs. + + Existing wecombot bots were all using webhook mode, so we set + enable-webhook=true to preserve their behavior after the new + WebSocket long connection mode is introduced as default. + """ + + async def upgrade(self): + """Upgrade""" + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text("SELECT uuid, adapter_config FROM bots WHERE adapter = 'wecombot'") + ) + bots = result.fetchall() + + for bot_row in bots: + bot_uuid = bot_row[0] + adapter_config = json.loads(bot_row[1]) if isinstance(bot_row[1], str) else bot_row[1] + + if 'enable-webhook' in adapter_config: + continue + + # Determine mode based on existing config: if webhook fields are present, keep webhook mode + has_webhook_config = bool( + adapter_config.get('Token') and adapter_config.get('EncodingAESKey') and adapter_config.get('Corpid') + ) + adapter_config['enable-webhook'] = has_webhook_config + + if self.ap.persistence_mgr.db.name == 'postgresql': + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('UPDATE bots SET adapter_config = :config::jsonb WHERE uuid = :uuid'), + {'config': json.dumps(adapter_config), 'uuid': bot_uuid}, + ) + else: + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('UPDATE bots SET adapter_config = :config WHERE uuid = :uuid'), + {'config': json.dumps(adapter_config), 'uuid': bot_uuid}, + ) + + async def downgrade(self): + """Downgrade""" + pass diff --git a/src/langbot/pkg/platform/sources/wecombot.py b/src/langbot/pkg/platform/sources/wecombot.py index 724a32b2..1849bdf8 100644 --- a/src/langbot/pkg/platform/sources/wecombot.py +++ b/src/langbot/pkg/platform/sources/wecombot.py @@ -11,6 +11,7 @@ import langbot_plugin.api.entities.builtin.platform.entities as platform_entitie from ..logger import EventLogger from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent from langbot.libs.wecom_ai_bot_api.api import WecomBotClient +from langbot.libs.wecom_ai_bot_api.ws_client import WecomBotWsClient class WecomBotMessageConverter(abstract_platform_adapter.AbstractMessageConverter): @@ -176,27 +177,42 @@ class WecomBotEventConverter(abstract_platform_adapter.AbstractEventConverter): class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): - bot: WecomBotClient + bot: typing.Union[WecomBotClient, WecomBotWsClient] bot_account_id: str message_converter: WecomBotMessageConverter = WecomBotMessageConverter() event_converter: WecomBotEventConverter = WecomBotEventConverter() config: dict bot_uuid: str = None + _ws_mode: bool = False def __init__(self, config: dict, logger: EventLogger): - required_keys = ['Token', 'EncodingAESKey', 'Corpid', 'BotId'] - missing_keys = [key for key in required_keys if key not in config] - if missing_keys: - raise Exception(f'WecomBot 缺少配置项: {missing_keys}') + enable_webhook = config.get('enable-webhook', False) - bot = WecomBotClient( - Token=config['Token'], - EnCodingAESKey=config['EncodingAESKey'], - Corpid=config['Corpid'], - logger=logger, - unified_mode=True, - ) - bot_account_id = config['BotId'] + if not enable_webhook: + bot = WecomBotWsClient( + bot_id=config['BotId'], + secret=config['Secret'], + logger=logger, + encoding_aes_key=config.get('EncodingAESKey', ''), + ) + ws_mode = True + else: + # Webhook callback mode + required_keys = ['Token', 'EncodingAESKey', 'Corpid'] + missing_keys = [key for key in required_keys if key not in config or not config[key]] + if missing_keys: + raise Exception(f'WecomBot webhook mode missing config: {missing_keys}') + + bot = WecomBotClient( + Token=config['Token'], + EnCodingAESKey=config['EncodingAESKey'], + Corpid=config['Corpid'], + logger=logger, + unified_mode=True, + ) + ws_mode = False + + bot_account_id = config.get('BotId', '') super().__init__( config=config, @@ -204,6 +220,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot=bot, bot_account_id=bot_account_id, ) + self._ws_mode = ws_mode async def reply_message( self, @@ -212,7 +229,15 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): quote_origin: bool = False, ): content = await self.message_converter.yiri2target(message) - await self.bot.set_message(message_source.source_platform_object.message_id, content) + if self._ws_mode: + event = message_source.source_platform_object + req_id = event.get('req_id', '') + if req_id: + await self.bot.reply_text(req_id, content) + else: + await self.bot.set_message(event.message_id, content) + else: + await self.bot.set_message(message_source.source_platform_object.message_id, content) async def reply_message_chunk( self, @@ -222,31 +247,22 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): quote_origin: bool = False, is_final: bool = False, ): - """将流水线增量输出写入企业微信 stream 会话。 - - Args: - message_source: 流水线提供的原始消息事件。 - bot_message: 当前片段对应的模型元信息(未使用)。 - message: 需要回复的消息链。 - quote_origin: 是否引用原消息(企业微信暂不支持)。 - is_final: 标记当前片段是否为最终回复。 - - Returns: - dict: 包含 `stream` 键,标识写入是否成功。 - - Example: - 在流水线 `reply_message_chunk` 调用中自动触发,无需手动调用。 - """ - # 转换为纯文本(智能机器人当前协议仅支持文本流) content = await self.message_converter.yiri2target(message) msg_id = message_source.source_platform_object.message_id - # 将片段推送到 WecomBotClient 中的队列,返回值用于判断是否走降级逻辑 - success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final) - if not success and is_final: - # 未命中流式队列时使用旧有 set_message 兜底 - await self.bot.set_message(msg_id, content) - return {'stream': success} + if self._ws_mode: + success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final) + if not success and is_final: + event = message_source.source_platform_object + req_id = event.get('req_id', '') + if req_id: + await self.bot.reply_text(req_id, content) + return {'stream': success} + else: + success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final) + if not success and is_final: + await self.bot.set_message(msg_id, content) + return {'stream': success} async def is_stream_output_supported(self) -> bool: """智能机器人侧默认开启流式能力。 @@ -259,7 +275,11 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): return True async def send_message(self, target_type, target_id, message): - pass + if self._ws_mode: + content = await self.message_converter.yiri2target(message) + await self.bot.send_message(target_id, content) + else: + pass def register_listener( self, @@ -288,29 +308,25 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): self.bot_uuid = bot_uuid async def handle_unified_webhook(self, bot_uuid: str, path: str, request): - """处理统一 webhook 请求。 - - Args: - bot_uuid: Bot 的 UUID - path: 子路径(如果有的话) - request: Quart Request 对象 - - Returns: - 响应数据 - """ + if self._ws_mode: + return None return await self.bot.handle_unified_webhook(request) async def run_async(self): - # 统一 webhook 模式下,不启动独立的 Quart 应用 - # 保持运行但不启动独立端口 + if self._ws_mode: + await self.bot.connect() + else: - async def keep_alive(): - while True: - await asyncio.sleep(1) + async def keep_alive(): + while True: + await asyncio.sleep(1) - await keep_alive() + await keep_alive() async def kill(self) -> bool: + if self._ws_mode: + await self.bot.disconnect() + return True return False async def unregister_listener( diff --git a/src/langbot/pkg/platform/sources/wecombot.yaml b/src/langbot/pkg/platform/sources/wecombot.yaml index 31a13bc6..acf2bd8b 100644 --- a/src/langbot/pkg/platform/sources/wecombot.yaml +++ b/src/langbot/pkg/platform/sources/wecombot.yaml @@ -11,35 +11,64 @@ metadata: icon: wecombot.png spec: config: + - name: BotId + label: + en_US: BotId + zh_Hans: 机器人ID (BotId) + type: string + required: true + default: "" + - name: enable-webhook + label: + en_US: Enable Webhook Mode + zh_Hans: 启用Webhook模式 + description: + en_US: If enabled, the bot will use webhook mode to receive messages. Otherwise, it will use WS long connection mode + zh_Hans: 如果启用,机器人将使用 Webhook 模式接收消息。否则,将使用 WS 长连接模式 + type: boolean + required: true + default: false + - name: Secret + label: + en_US: Secret + zh_Hans: 机器人密钥 (Secret) + description: + en_US: Required for WebSocket long connection mode + zh_Hans: 使用 WS 长连接模式时必填 + type: string + required: false + default: "" - name: Corpid label: en_US: Corpid zh_Hans: 企业ID + description: + en_US: Required for Webhook mode + zh_Hans: 使用 Webhook 模式时必填 type: string - required: true + required: false default: "" - name: Token label: en_US: Token zh_Hans: 令牌 (Token) + description: + en_US: Required for Webhook mode + zh_Hans: 使用 Webhook 模式时必填 type: string - required: true + required: false default: "" - name: EncodingAESKey label: en_US: EncodingAESKey zh_Hans: 消息加解密密钥 (EncodingAESKey) - type: string - required: true - default: "" - - name: BotId - label: - en_US: BotId - zh_Hans: 机器人ID + description: + en_US: Required for Webhook mode. Optional for WebSocket mode (used for file decryption) + zh_Hans: 使用 Webhook 模式时必填。WebSocket 模式下可选(用于文件解密) type: string required: false default: "" execution: python: path: ./wecombot.py - attr: WecomBotAdapter \ No newline at end of file + attr: WecomBotAdapter diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index c1b854f2..ea0a682f 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 23 +required_database_version = 24 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False