mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
* add conversation expire config * add user query text to card * fix(pipeline): move session limit to AI config * test(pipeline): cover AI session limit config * refactor(pipeline): merge session expire-time into AI runner stage Move the session validity duration field out of the standalone session-limit stage into the runner stage so it actually renders in the AI tab (the tab only shows the runner stage and the stage matching the selected runner — any other stage is filtered out). Read path, default config, metadata description, and tests updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(pipeline): expire conversations from last update time * fix(n8n): sync generated conversation id into payload --------- Co-authored-by: RockChinQ <rockchinq@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
590 lines
25 KiB
Python
590 lines
25 KiB
Python
import asyncio
|
||
import base64
|
||
import json
|
||
import time
|
||
import urllib.parse
|
||
from typing import Callable
|
||
import dingtalk_stream # type: ignore
|
||
import websockets
|
||
from .EchoHandler import EchoTextHandler
|
||
from .dingtalkevent import DingTalkEvent
|
||
import httpx
|
||
import traceback
|
||
|
||
|
||
class DingTalkClient:
|
||
def __init__(
|
||
self,
|
||
client_id: str,
|
||
client_secret: str,
|
||
robot_name: str,
|
||
robot_code: str,
|
||
markdown_card: bool,
|
||
logger: None,
|
||
):
|
||
"""初始化 WebSocket 连接并自动启动"""
|
||
self.credential = dingtalk_stream.Credential(client_id, client_secret)
|
||
self.client = dingtalk_stream.DingTalkStreamClient(self.credential)
|
||
self.key = client_id
|
||
self.secret = client_secret
|
||
# 在 DingTalkClient 中传入自己作为参数,避免循环导入
|
||
self.EchoTextHandler = EchoTextHandler(self)
|
||
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
|
||
self._message_handlers = {
|
||
'example': [],
|
||
}
|
||
self.access_token = ''
|
||
self.robot_name = robot_name
|
||
self.robot_code = robot_code
|
||
self.access_token_expiry_time = ''
|
||
self.markdown_card = markdown_card
|
||
self.logger = logger
|
||
self._stopped = False # Flag to control the event loop
|
||
|
||
async def get_access_token(self):
|
||
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {'appKey': self.key, 'appSecret': self.secret}
|
||
async with httpx.AsyncClient() as client:
|
||
try:
|
||
response = await client.post(url, json=data, headers=headers)
|
||
if response.status_code == 200:
|
||
response_data = response.json()
|
||
self.access_token = response_data.get('accessToken')
|
||
expires_in = int(response_data.get('expireIn', 7200))
|
||
self.access_token_expiry_time = time.time() + expires_in - 60
|
||
except Exception:
|
||
await self.logger.error('failed to get access token in dingtalk')
|
||
|
||
async def is_token_expired(self):
|
||
"""检查token是否过期"""
|
||
if self.access_token_expiry_time is None:
|
||
return True
|
||
return time.time() > self.access_token_expiry_time
|
||
|
||
async def check_access_token(self):
|
||
if not self.access_token or await self.is_token_expired():
|
||
return False
|
||
return bool(self.access_token and self.access_token.strip())
|
||
|
||
async def download_image(self, download_code: str):
|
||
if not await self.check_access_token():
|
||
await self.get_access_token()
|
||
url = 'https://api.dingtalk.com/v1.0/robot/messageFiles/download'
|
||
params = {'downloadCode': download_code, 'robotCode': self.robot_code}
|
||
headers = {'x-acs-dingtalk-access-token': self.access_token}
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, headers=headers, json=params)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
download_url = result.get('downloadUrl')
|
||
else:
|
||
await self.logger.error(f'failed to get download url: {response.json()}')
|
||
|
||
if download_url:
|
||
return await self.download_url_to_base64(download_url)
|
||
|
||
async def download_url_to_base64(self, download_url):
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.get(download_url)
|
||
|
||
if response.status_code == 200:
|
||
file_bytes = response.content
|
||
mime_type = response.headers.get('Content-Type', 'application/octet-stream')
|
||
base64_str = base64.b64encode(file_bytes).decode('utf-8')
|
||
return f'data:{mime_type};base64,{base64_str}'
|
||
else:
|
||
await self.logger.error(f'failed to get files: {response.json()}')
|
||
|
||
async def get_audio_url(self, download_code: str):
|
||
if not await self.check_access_token():
|
||
await self.get_access_token()
|
||
url = 'https://api.dingtalk.com/v1.0/robot/messageFiles/download'
|
||
params = {'downloadCode': download_code, 'robotCode': self.robot_code}
|
||
headers = {'x-acs-dingtalk-access-token': self.access_token}
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, headers=headers, json=params)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
download_url = result.get('downloadUrl')
|
||
if download_url:
|
||
return await self.download_url_to_base64(download_url)
|
||
else:
|
||
await self.logger.error(f'failed to get audio: {response.json()}')
|
||
else:
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
async def get_file_url(self, download_code: str):
|
||
if not await self.check_access_token():
|
||
await self.get_access_token()
|
||
url = 'https://api.dingtalk.com/v1.0/robot/messageFiles/download'
|
||
params = {'downloadCode': download_code, 'robotCode': self.robot_code}
|
||
headers = {'x-acs-dingtalk-access-token': self.access_token}
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, headers=headers, json=params)
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
download_url = result.get('downloadUrl')
|
||
if download_url:
|
||
return download_url
|
||
else:
|
||
await self.logger.error(f'failed to get file: {response.json()}')
|
||
else:
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
async def update_incoming_message(self, message):
|
||
"""异步更新 DingTalkClient 中的 incoming_message"""
|
||
message_data = await self.get_message(message)
|
||
if message_data:
|
||
event = DingTalkEvent.from_payload(message_data)
|
||
if event:
|
||
await self._handle_message(event)
|
||
|
||
async def send_message(self, content: str, incoming_message, at: bool):
|
||
if self.markdown_card:
|
||
if at:
|
||
self.EchoTextHandler.reply_markdown(
|
||
title='@' + incoming_message.sender_nick + ' ' + content,
|
||
text='@' + incoming_message.sender_nick + ' ' + content,
|
||
incoming_message=incoming_message,
|
||
)
|
||
else:
|
||
self.EchoTextHandler.reply_markdown(
|
||
title=content,
|
||
text=content,
|
||
incoming_message=incoming_message,
|
||
)
|
||
else:
|
||
self.EchoTextHandler.reply_text(content, incoming_message)
|
||
|
||
async def get_incoming_message(self):
|
||
"""获取收到的消息"""
|
||
return await self.EchoTextHandler.get_incoming_message()
|
||
|
||
def on_message(self, msg_type: str):
|
||
def decorator(func: Callable[[DingTalkEvent], None]):
|
||
if msg_type not in self._message_handlers:
|
||
self._message_handlers[msg_type] = []
|
||
self._message_handlers[msg_type].append(func)
|
||
return func
|
||
|
||
return decorator
|
||
|
||
async def _handle_message(self, event: DingTalkEvent):
|
||
"""
|
||
处理消息事件。
|
||
"""
|
||
# Skip message handling if stopped
|
||
if self._stopped:
|
||
return
|
||
msg_type = event.conversation
|
||
if msg_type in self._message_handlers:
|
||
for handler in self._message_handlers[msg_type]:
|
||
await handler(event)
|
||
|
||
async def _parse_quoted_message(self, replied_msg: dict) -> dict:
|
||
"""Parse the quoted/replied message and extract its content.
|
||
|
||
Args:
|
||
replied_msg: The repliedMsg object from DingTalk message
|
||
|
||
Returns:
|
||
A dict containing the quoted message info with keys:
|
||
- message_id: The original message ID
|
||
- msg_type: The message type (text, file, picture, audio, etc.)
|
||
- content: The text content (if any)
|
||
- file_url: The file download URL (if file type)
|
||
- file_name: The file name (if file type)
|
||
- picture: The picture base64 (if picture type)
|
||
- audio: The audio base64 (if audio type)
|
||
"""
|
||
quote_info = {
|
||
'message_id': replied_msg.get('msgId', ''),
|
||
'msg_type': replied_msg.get('msgType', ''),
|
||
'sender_id': replied_msg.get('senderId', ''),
|
||
}
|
||
|
||
msg_type = replied_msg.get('msgType', '')
|
||
content = replied_msg.get('content', {})
|
||
|
||
# Handle content as string (JSON) or dict
|
||
if isinstance(content, str):
|
||
try:
|
||
content = json.loads(content)
|
||
except (json.JSONDecodeError, TypeError):
|
||
content = {}
|
||
|
||
if msg_type == 'text':
|
||
# Text message
|
||
if isinstance(content, dict):
|
||
quote_info['content'] = content.get('content', '')
|
||
else:
|
||
quote_info['content'] = str(content)
|
||
|
||
elif msg_type == 'file':
|
||
# File message
|
||
download_code = content.get('downloadCode')
|
||
file_name = content.get('fileName')
|
||
if download_code and file_name:
|
||
try:
|
||
quote_info['file_url'] = await self.get_file_url(download_code)
|
||
quote_info['file_name'] = file_name
|
||
except Exception as e:
|
||
if self.logger:
|
||
await self.logger.error(f'Failed to get quoted file URL: {e}')
|
||
|
||
elif msg_type == 'picture':
|
||
# Picture message
|
||
download_code = content.get('downloadCode')
|
||
if download_code:
|
||
try:
|
||
quote_info['picture'] = await self.download_image(download_code)
|
||
except Exception as e:
|
||
if self.logger:
|
||
await self.logger.error(f'Failed to download quoted image: {e}')
|
||
|
||
elif msg_type == 'audio':
|
||
# Audio message
|
||
download_code = content.get('downloadCode')
|
||
if download_code:
|
||
try:
|
||
quote_info['audio'] = await self.get_audio_url(download_code)
|
||
except Exception as e:
|
||
if self.logger:
|
||
await self.logger.error(f'Failed to get quoted audio: {e}')
|
||
|
||
elif msg_type == 'richText':
|
||
# Rich text message - extract text content
|
||
rich_text = content.get('richText', [])
|
||
texts = []
|
||
for item in rich_text:
|
||
if 'text' in item and item['text'] != '\n':
|
||
texts.append(item['text'])
|
||
quote_info['content'] = '\n'.join(texts)
|
||
|
||
return quote_info
|
||
|
||
async def get_message(self, incoming_message: dingtalk_stream.chatbot.ChatbotMessage):
|
||
try:
|
||
# print(json.dumps(incoming_message.to_dict(), indent=4, ensure_ascii=False))
|
||
message_data = {
|
||
'IncomingMessage': incoming_message,
|
||
}
|
||
if str(incoming_message.conversation_type) == '1':
|
||
message_data['conversation_type'] = 'FriendMessage'
|
||
elif str(incoming_message.conversation_type) == '2':
|
||
message_data['conversation_type'] = 'GroupMessage'
|
||
|
||
# Check for quoted/replied message
|
||
raw_data = incoming_message.to_dict()
|
||
text_data = raw_data.get('text', {})
|
||
if isinstance(text_data, dict) and text_data.get('isReplyMsg'):
|
||
replied_msg = text_data.get('repliedMsg', {})
|
||
if replied_msg:
|
||
quote_info = await self._parse_quoted_message(replied_msg)
|
||
message_data['QuotedMessage'] = quote_info
|
||
|
||
if incoming_message.message_type == 'richText':
|
||
data = incoming_message.rich_text_content.to_dict()
|
||
|
||
# 使用统一的结构化数据格式,保持顺序
|
||
rich_content = {
|
||
'Type': 'richText',
|
||
'Elements': [], # 按顺序存储所有元素
|
||
'SimpleContent': '', # 兼容字段:纯文本内容
|
||
'SimplePicture': '', # 兼容字段:第一张图片
|
||
}
|
||
|
||
# 先收集所有文本和图片占位符
|
||
text_elements = []
|
||
|
||
# 解析富文本内容,保持原始顺序
|
||
for item in data['richText']:
|
||
# 处理文本内容
|
||
if 'text' in item and item['text'] != '\n':
|
||
element = {'Type': 'text', 'Content': item['text']}
|
||
rich_content['Elements'].append(element)
|
||
text_elements.append(item['text'])
|
||
|
||
# 检查是否是图片元素 - 根据钉钉API的实际结构调整
|
||
# 钉钉富文本中的图片通常有特定标识,可能需要根据实际返回调整
|
||
elif item.get('type') == 'picture':
|
||
# 创建图片占位符
|
||
element = {
|
||
'Type': 'image_placeholder',
|
||
}
|
||
rich_content['Elements'].append(element)
|
||
|
||
# 获取并下载所有图片
|
||
image_list = incoming_message.get_image_list()
|
||
if image_list:
|
||
new_elements = []
|
||
image_index = 0
|
||
|
||
for element in rich_content['Elements']:
|
||
if element['Type'] == 'image_placeholder':
|
||
if image_index < len(image_list) and image_list[image_index]:
|
||
image_url = await self.download_image(image_list[image_index])
|
||
new_elements.append({'Type': 'image', 'Picture': image_url})
|
||
image_index += 1
|
||
else:
|
||
# 如果没有对应的图片,保留占位符或跳过
|
||
continue
|
||
else:
|
||
new_elements.append(element)
|
||
|
||
rich_content['Elements'] = new_elements
|
||
|
||
# 设置兼容字段
|
||
all_texts = [elem['Content'] for elem in rich_content['Elements'] if elem.get('Type') == 'text']
|
||
rich_content['SimpleContent'] = '\n'.join(all_texts) if all_texts else ''
|
||
|
||
all_images = [elem['Picture'] for elem in rich_content['Elements'] if elem.get('Type') == 'image']
|
||
if all_images:
|
||
rich_content['SimplePicture'] = all_images[0]
|
||
rich_content['AllImages'] = all_images # 所有图片的列表
|
||
|
||
# 设置原始的 content 和 picture 字段以保持兼容
|
||
message_data['Content'] = rich_content['SimpleContent']
|
||
message_data['Rich_Content'] = rich_content
|
||
if all_images:
|
||
message_data['Picture'] = all_images[0]
|
||
|
||
elif incoming_message.message_type == 'text':
|
||
message_data['Content'] = incoming_message.get_text_list()[0]
|
||
|
||
message_data['Type'] = 'text'
|
||
elif incoming_message.message_type == 'picture':
|
||
message_data['Picture'] = await self.download_image(incoming_message.get_image_list()[0])
|
||
|
||
message_data['Type'] = 'image'
|
||
elif incoming_message.message_type == 'audio':
|
||
raw_content = incoming_message.to_dict().get('content', {})
|
||
# 兼容处理:如果 content 仍为 JSON 字符串则进行解析
|
||
if isinstance(raw_content, str):
|
||
try:
|
||
raw_content = json.loads(raw_content)
|
||
except (json.JSONDecodeError, TypeError):
|
||
raw_content = {}
|
||
|
||
if self.logger:
|
||
await self.logger.info(f'DingTalk audio raw content: {json.dumps(raw_content, ensure_ascii=False)}')
|
||
|
||
# 提取钉钉自带的语音转写文字(Powered by Qwen)
|
||
recognition = raw_content.get('recognition', '')
|
||
if recognition:
|
||
message_data['Content'] = recognition
|
||
|
||
download_code = raw_content.get('downloadCode')
|
||
if download_code:
|
||
message_data['Audio'] = await self.get_audio_url(download_code)
|
||
|
||
message_data['Type'] = 'audio'
|
||
elif incoming_message.message_type == 'file':
|
||
# 获取原始数据字典并提取嵌套的文件信息
|
||
raw_data = incoming_message.to_dict()
|
||
file_info = raw_data.get('content', {})
|
||
|
||
# 兼容处理:如果 content 仍为 JSON 字符串则进行解析
|
||
if isinstance(file_info, str):
|
||
try:
|
||
file_info = json.loads(file_info)
|
||
except (json.JSONDecodeError, TypeError):
|
||
file_info = {}
|
||
|
||
download_code = file_info.get('downloadCode')
|
||
file_name = file_info.get('fileName')
|
||
|
||
if download_code and file_name:
|
||
# 转换 downloadCode 为可下载的真实 URL
|
||
message_data['File'] = await self.get_file_url(download_code)
|
||
message_data['Name'] = file_name
|
||
else:
|
||
if self.logger:
|
||
await self.logger.error(f'Failed to extract file info from message content: {file_info}')
|
||
message_data['File'] = None
|
||
message_data['Name'] = None
|
||
|
||
message_data['Type'] = 'file'
|
||
|
||
copy_message_data = message_data.copy()
|
||
del copy_message_data['IncomingMessage']
|
||
# print("message_data:", json.dumps(copy_message_data, indent=4, ensure_ascii=False))
|
||
except Exception:
|
||
if self.logger:
|
||
await self.logger.error(f'Error in get_message: {traceback.format_exc()}')
|
||
else:
|
||
traceback.print_exc()
|
||
|
||
return message_data
|
||
|
||
async def send_proactive_message_to_one(self, target_id: str, content: str):
|
||
if not await self.check_access_token():
|
||
await self.get_access_token()
|
||
|
||
url = 'https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend'
|
||
|
||
headers = {
|
||
'x-acs-dingtalk-access-token': self.access_token,
|
||
'Content-Type': 'application/json',
|
||
}
|
||
|
||
data = {
|
||
'robotCode': self.robot_code,
|
||
'userIds': [target_id],
|
||
'msgKey': 'sampleText',
|
||
'msgParam': json.dumps({'content': content}),
|
||
}
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, headers=headers, json=data)
|
||
if response.status_code == 200:
|
||
return
|
||
except Exception:
|
||
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||
|
||
async def send_proactive_message_to_group(self, target_id: str, content: str):
|
||
if not await self.check_access_token():
|
||
await self.get_access_token()
|
||
|
||
url = 'https://api.dingtalk.com/v1.0/robot/groupMessages/send'
|
||
|
||
headers = {
|
||
'x-acs-dingtalk-access-token': self.access_token,
|
||
'Content-Type': 'application/json',
|
||
}
|
||
|
||
data = {
|
||
'robotCode': self.robot_code,
|
||
'openConversationId': target_id,
|
||
'msgKey': 'sampleText',
|
||
'msgParam': json.dumps({'content': content}),
|
||
}
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(url, headers=headers, json=data)
|
||
if response.status_code == 200:
|
||
return
|
||
except Exception:
|
||
await self.logger.error(f'failed to send proactive massage to group: {traceback.format_exc()}')
|
||
raise Exception(f'failed to send proactive massage to group: {traceback.format_exc()}')
|
||
|
||
async def create_and_card(
|
||
self,
|
||
temp_card_id: str,
|
||
incoming_message: dingtalk_stream.ChatbotMessage,
|
||
quote_origin: bool = False,
|
||
card_auto_layout: bool = False,
|
||
):
|
||
card_data = {}
|
||
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
|
||
card_data['content'] = ''
|
||
|
||
# 将用户的消息内容作为卡片的查询参数,方便后续处理
|
||
if incoming_message.message_type == 'text':
|
||
card_data['query'] = incoming_message.get_text_list()[0]
|
||
else:
|
||
card_data['query'] = '...'
|
||
|
||
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message)
|
||
# print(card_instance)
|
||
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
|
||
card_instance_id = await card_instance.async_create_and_deliver_card(
|
||
temp_card_id,
|
||
card_data,
|
||
)
|
||
return card_instance, card_instance_id
|
||
|
||
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
|
||
content_key = 'content'
|
||
try:
|
||
await card_instance.async_streaming(
|
||
card_instance_id,
|
||
content_key=content_key,
|
||
content_value=content,
|
||
append=False,
|
||
finished=is_final,
|
||
failed=False,
|
||
)
|
||
except Exception as e:
|
||
self.logger.exception(e)
|
||
await card_instance.async_streaming(
|
||
card_instance_id,
|
||
content_key=content_key,
|
||
content_value='',
|
||
append=False,
|
||
finished=is_final,
|
||
failed=True,
|
||
)
|
||
|
||
async def start(self):
|
||
"""启动 WebSocket 连接,监听消息"""
|
||
self._stopped = False
|
||
self.client.pre_start()
|
||
|
||
while not self._stopped:
|
||
try:
|
||
connection = self.client.open_connection()
|
||
|
||
if not connection:
|
||
if self.logger:
|
||
await self.logger.error('DingTalk: open connection failed')
|
||
await asyncio.sleep(10)
|
||
continue
|
||
|
||
uri = '%s?ticket=%s' % (connection['endpoint'], urllib.parse.quote_plus(connection['ticket']))
|
||
async with websockets.connect(uri) as websocket:
|
||
self.client.websocket = websocket
|
||
keepalive_task = asyncio.create_task(self._keepalive(websocket))
|
||
try:
|
||
async for raw_message in websocket:
|
||
if self._stopped:
|
||
break
|
||
json_message = json.loads(raw_message)
|
||
asyncio.create_task(self.client.background_task(json_message))
|
||
finally:
|
||
keepalive_task.cancel()
|
||
try:
|
||
await keepalive_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except asyncio.CancelledError:
|
||
# Properly exit when task is cancelled
|
||
break
|
||
except websockets.exceptions.ConnectionClosedError as e:
|
||
if self._stopped:
|
||
break
|
||
if self.logger:
|
||
await self.logger.error(f'DingTalk: connection closed, reconnecting... error={e}')
|
||
await asyncio.sleep(5)
|
||
continue
|
||
except Exception as e:
|
||
if self._stopped:
|
||
break
|
||
if self.logger:
|
||
await self.logger.error(f'DingTalk: unknown exception, reconnecting... error={e}')
|
||
await asyncio.sleep(3)
|
||
continue
|
||
|
||
async def _keepalive(self, ws, ping_interval=60):
|
||
"""Keep WebSocket connection alive"""
|
||
while not self._stopped:
|
||
await asyncio.sleep(ping_interval)
|
||
try:
|
||
await ws.ping()
|
||
except websockets.exceptions.ConnectionClosed:
|
||
break
|
||
|
||
async def stop(self):
|
||
"""停止 WebSocket 连接"""
|
||
self._stopped = True
|
||
# Close WebSocket connection if exists
|
||
if self.client.websocket:
|
||
try:
|
||
await self.client.websocket.close()
|
||
except Exception:
|
||
pass
|
||
# Clear message handlers to prevent stale callbacks
|
||
self._message_handlers = {'example': []}
|