fix: dingtalk adapter lifecycle mgm issues (#1844, #1853)

This commit is contained in:
Junyan Qin
2025-12-23 14:00:41 +08:00
parent 24bd90fcf6
commit 806a03cd53
2 changed files with 76 additions and 2 deletions

View File

@@ -1,8 +1,11 @@
import asyncio
import base64
import json
import time
import urllib.parse
from typing import Callable
import dingtalk_stream # type: ignore
import websockets
from .EchoHandler import EchoTextHandler
from .dingtalkevent import DingTalkEvent
import httpx
@@ -36,6 +39,7 @@ class DingTalkClient:
self.access_token_expiry_time = ''
self.markdown_card = markdown_card
self.logger = logger
self._stopped = False # Flag to control the event loop
async def get_access_token(self):
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
@@ -170,6 +174,9 @@ class DingTalkClient:
"""
处理消息事件。
"""
# Skip message handling if stopped
if self._stopped:
return
msg_type = event.conversation
if msg_type in self._message_handlers:
for handler in self._message_handlers[msg_type]:
@@ -378,4 +385,70 @@ class DingTalkClient:
async def start(self):
"""启动 WebSocket 连接,监听消息"""
await self.client.start()
self._stopped = False
self.client.pre_start()
while not self._stopped:
try:
connection = self.client.open_connection()
if not connection:
if self.logger:
await self.logger.error('DingTalk: open connection failed')
await asyncio.sleep(10)
continue
uri = '%s?ticket=%s' % (connection['endpoint'], urllib.parse.quote_plus(connection['ticket']))
async with websockets.connect(uri) as websocket:
self.client.websocket = websocket
keepalive_task = asyncio.create_task(self._keepalive(websocket))
try:
async for raw_message in websocket:
if self._stopped:
break
json_message = json.loads(raw_message)
asyncio.create_task(self.client.background_task(json_message))
finally:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
# Properly exit when task is cancelled
break
except websockets.exceptions.ConnectionClosedError as e:
if self._stopped:
break
if self.logger:
await self.logger.error(f'DingTalk: connection closed, reconnecting... error={e}')
await asyncio.sleep(5)
continue
except Exception as e:
if self._stopped:
break
if self.logger:
await self.logger.error(f'DingTalk: unknown exception, reconnecting... error={e}')
await asyncio.sleep(3)
continue
async def _keepalive(self, ws, ping_interval=60):
"""Keep WebSocket connection alive"""
while not self._stopped:
await asyncio.sleep(ping_interval)
try:
await ws.ping()
except websockets.exceptions.ConnectionClosed:
break
async def stop(self):
"""停止 WebSocket 连接"""
self._stopped = True
# Close WebSocket connection if exists
if self.client.websocket:
try:
await self.client.websocket.close()
except Exception:
pass
# Clear message handlers to prevent stale callbacks
self._message_handlers = {'example': []}

View File

@@ -260,7 +260,8 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
await self.bot.start()
async def kill(self) -> bool:
return False
await self.bot.stop()
return True
async def is_muted(self) -> bool:
return False