From 806a03cd538724b01da319bf774356fc8728f337 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 23 Dec 2025 14:00:41 +0800 Subject: [PATCH] fix: dingtalk adapter lifecycle mgm issues (#1844, #1853) --- src/langbot/libs/dingtalk_api/api.py | 75 +++++++++++++++++++- src/langbot/pkg/platform/sources/dingtalk.py | 3 +- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/langbot/libs/dingtalk_api/api.py b/src/langbot/libs/dingtalk_api/api.py index abd68a40..e39e12bf 100644 --- a/src/langbot/libs/dingtalk_api/api.py +++ b/src/langbot/libs/dingtalk_api/api.py @@ -1,8 +1,11 @@ +import asyncio import base64 import json import time +import urllib.parse from typing import Callable import dingtalk_stream # type: ignore +import websockets from .EchoHandler import EchoTextHandler from .dingtalkevent import DingTalkEvent import httpx @@ -36,6 +39,7 @@ class DingTalkClient: self.access_token_expiry_time = '' self.markdown_card = markdown_card self.logger = logger + self._stopped = False # Flag to control the event loop async def get_access_token(self): url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' @@ -170,6 +174,9 @@ class DingTalkClient: """ 处理消息事件。 """ + # Skip message handling if stopped + if self._stopped: + return msg_type = event.conversation if msg_type in self._message_handlers: for handler in self._message_handlers[msg_type]: @@ -378,4 +385,70 @@ class DingTalkClient: async def start(self): """启动 WebSocket 连接,监听消息""" - await self.client.start() + self._stopped = False + self.client.pre_start() + + while not self._stopped: + try: + connection = self.client.open_connection() + + if not connection: + if self.logger: + await self.logger.error('DingTalk: open connection failed') + await asyncio.sleep(10) + continue + + uri = '%s?ticket=%s' % (connection['endpoint'], urllib.parse.quote_plus(connection['ticket'])) + async with websockets.connect(uri) as websocket: + self.client.websocket = websocket + keepalive_task = asyncio.create_task(self._keepalive(websocket)) + try: + async for raw_message in websocket: + if self._stopped: + break + json_message = json.loads(raw_message) + asyncio.create_task(self.client.background_task(json_message)) + finally: + keepalive_task.cancel() + try: + await keepalive_task + except asyncio.CancelledError: + pass + except asyncio.CancelledError: + # Properly exit when task is cancelled + break + except websockets.exceptions.ConnectionClosedError as e: + if self._stopped: + break + if self.logger: + await self.logger.error(f'DingTalk: connection closed, reconnecting... error={e}') + await asyncio.sleep(5) + continue + except Exception as e: + if self._stopped: + break + if self.logger: + await self.logger.error(f'DingTalk: unknown exception, reconnecting... error={e}') + await asyncio.sleep(3) + continue + + async def _keepalive(self, ws, ping_interval=60): + """Keep WebSocket connection alive""" + while not self._stopped: + await asyncio.sleep(ping_interval) + try: + await ws.ping() + except websockets.exceptions.ConnectionClosed: + break + + async def stop(self): + """停止 WebSocket 连接""" + self._stopped = True + # Close WebSocket connection if exists + if self.client.websocket: + try: + await self.client.websocket.close() + except Exception: + pass + # Clear message handlers to prevent stale callbacks + self._message_handlers = {'example': []} diff --git a/src/langbot/pkg/platform/sources/dingtalk.py b/src/langbot/pkg/platform/sources/dingtalk.py index a47a5238..6c4988c4 100644 --- a/src/langbot/pkg/platform/sources/dingtalk.py +++ b/src/langbot/pkg/platform/sources/dingtalk.py @@ -260,7 +260,8 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): await self.bot.start() async def kill(self) -> bool: - return False + await self.bot.stop() + return True async def is_muted(self) -> bool: return False