feat(dingtalk): implement human input card support and card action handling

- Add a new module `card_callback.py` to handle card action button clicks from DingTalk.
- Introduce `DingTalkCardActionHandler` to process card action callbacks and extract parameters.
- Update `DingTalkAdapter` to manage card state and handle form input through a single card template.
- Add configuration for `human_input_card_template_id` in `dingtalk.yaml` to specify the template for human input.
- Create a new card template `dingtalk_human_input_card.json` for rendering human input prompts and buttons.
This commit is contained in:
fdc310
2026-06-15 15:51:52 +08:00
parent e08b5db625
commit 83b0d26e99
6 changed files with 1630 additions and 41 deletions
+297 -25
View File
@@ -1,17 +1,26 @@
import asyncio
import base64
import json
import logging
import time
import uuid
import urllib.parse
from typing import Callable
from typing import Awaitable, Callable, Optional
import dingtalk_stream # type: ignore
import websockets
from .EchoHandler import EchoTextHandler
from .card_callback import DingTalkCardActionHandler
from .dingtalkevent import DingTalkEvent
import httpx
import traceback
_stdout_logger = logging.getLogger('langbot.dingtalk_api')
DINGTALK_OPENAPI_BASE = 'https://api.dingtalk.com'
class DingTalkClient:
def __init__(
self,
@@ -21,6 +30,7 @@ class DingTalkClient:
robot_code: str,
markdown_card: bool,
logger: None,
card_action_callback: Optional[Callable[[dict], Awaitable[None]]] = None,
):
"""初始化 WebSocket 连接并自动启动"""
self.credential = dingtalk_stream.Credential(client_id, client_secret)
@@ -30,6 +40,14 @@ class DingTalkClient:
# 在 DingTalkClient 中传入自己作为参数,避免循环导入
self.EchoTextHandler = EchoTextHandler(self)
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
# STREAM-mode card action button click handler. Forwards parsed payload
# to the adapter so it can resume paused Dify workflows.
self.card_action_callback = card_action_callback
self.card_action_handler = DingTalkCardActionHandler(self.client, self._on_card_action)
self.client.register_callback_handler(
dingtalk_stream.handlers.CallbackHandler.TOPIC_CARD_CALLBACK,
self.card_action_handler,
)
self._message_handlers = {
'example': [],
}
@@ -41,6 +59,16 @@ class DingTalkClient:
self.logger = logger
self._stopped = False # Flag to control the event loop
async def _on_card_action(self, payload: dict) -> None:
"""Dispatch a parsed card-action payload to the adapter callback."""
if self.card_action_callback is None:
return
try:
await self.card_action_callback(payload)
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card action callback error: {traceback.format_exc()}')
async def get_access_token(self):
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
headers = {'Content-Type': 'application/json'}
@@ -429,18 +457,35 @@ class DingTalkClient:
'Content-Type': 'application/json',
}
# For enterprise-internal robots, robotCode == AppKey (client_id).
# The dedicated robot_code field is only required for scenario-group
# robots or third-party robots; fall back to client_id when empty so
# the common single-bot setup keeps working without manual config.
robot_code = self.robot_code or self.key
data = {
'robotCode': self.robot_code,
'robotCode': robot_code,
'userIds': [target_id],
'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}),
}
_stdout_logger.info(
'DingTalk send_proactive_message_to_one request: robotCode=%s target_id=%s content_len=%d',
robot_code,
target_id,
len(content),
)
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
_stdout_logger.info(
'DingTalk send_proactive_message_to_one response: status=%d body=%s',
response.status_code,
response.text[:500],
)
if response.status_code == 200:
return
except Exception:
_stdout_logger.exception('DingTalk send_proactive_message_to_one error')
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
@@ -456,7 +501,7 @@ class DingTalkClient:
}
data = {
'robotCode': self.robot_code,
'robotCode': self.robot_code or self.key,
'openConversationId': target_id,
'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}),
@@ -477,47 +522,274 @@ class DingTalkClient:
quote_origin: bool = False,
card_auto_layout: bool = False,
):
card_data = {}
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
card_data['content'] = ''
"""Create + deliver the streaming chat card for a chatbot reply.
# 将用户的消息内容作为卡片的查询参数,方便后续处理
if incoming_message.message_type == 'text':
card_data['query'] = incoming_message.get_text_list()[0]
Replaces the old `dingtalk_stream.AICardReplier`-based path. Returns
`(None, out_track_id)` to keep call sites compatible with the
previous `(card_instance, card_instance_id)` shape — the first slot
is unused now that everything is driven by out_track_id.
"""
out_track_id = uuid.uuid4().hex
is_group = str(incoming_message.conversation_type) == '2'
if is_group:
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
else:
card_data['query'] = '...'
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message)
# print(card_instance)
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
card_instance_id = await card_instance.async_create_and_deliver_card(
temp_card_id,
card_data,
card_param_map = {'content': ''}
if incoming_message.message_type == 'text':
card_param_map['query'] = incoming_message.get_text_list()[0]
else:
card_param_map['query'] = '...'
await self.create_and_deliver_card(
card_template_id=temp_card_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map=card_param_map,
card_data_config={'autoLayout': card_auto_layout},
)
return card_instance, card_instance_id
return None, out_track_id
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
content_key = 'content'
"""Stream a single chunk into an existing card's `content` field."""
try:
await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
await self.streaming_update_card(
out_track_id=card_instance_id,
content_key='content',
content_value=content,
append=False,
finished=is_final,
failed=False,
)
except Exception as e:
self.logger.exception(e)
await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
if self.logger:
self.logger.exception(e)
await self.streaming_update_card(
out_track_id=card_instance_id,
content_key='content',
content_value='',
append=False,
finished=is_final,
failed=True,
)
async def create_and_deliver_card(
self,
*,
card_template_id: str,
out_track_id: str,
open_space_id: str,
is_group: bool,
card_param_map: Optional[dict] = None,
callback_type: str = 'STREAM',
callback_route_key: Optional[str] = None,
support_forward: bool = True,
dynamic_data_source_configs: Optional[list] = None,
card_data_config: Optional[dict] = None,
at_user_ids: Optional[dict] = None,
recipients: Optional[list] = None,
) -> bool:
"""POST /v1.0/card/instances/createAndDeliver.
Mirrors the SDK's `async_create_and_deliver_card` shape but exposes
the dynamic-data-source config slot so we can register a pull URL
for variable-length button lists.
"""
if not await self.check_access_token():
await self.get_access_token()
cardData: dict = {'cardParamMap': card_param_map or {}}
if card_data_config is not None:
cardData['config'] = json.dumps(card_data_config)
body: dict = {
'cardTemplateId': card_template_id,
'outTrackId': out_track_id,
'cardData': cardData,
'callbackType': callback_type,
'openSpaceId': open_space_id,
'imGroupOpenSpaceModel': {'supportForward': support_forward},
'imRobotOpenSpaceModel': {'supportForward': support_forward},
}
if callback_type == 'HTTP' and callback_route_key:
body['callbackRouteKey'] = callback_route_key
if is_group:
deliver: dict = {'robotCode': self.robot_code or self.key}
if at_user_ids:
deliver['atUserIds'] = at_user_ids
if recipients is not None:
deliver['recipients'] = recipients
body['imGroupOpenDeliverModel'] = deliver
else:
body['imRobotOpenDeliverModel'] = {'spaceType': 'IM_ROBOT'}
if dynamic_data_source_configs:
body['openDynamicDataConfig'] = {'dynamicDataSourceConfigs': dynamic_data_source_configs}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/createAndDeliver'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk createAndDeliver request body: %s',
json.dumps(body, ensure_ascii=False)[:1500],
)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
_stdout_logger.info(
'DingTalk createAndDeliver response: %s',
response.text[:500],
)
return True
_stdout_logger.error(
'DingTalk createAndDeliver failed: status=%s body=%s',
response.status_code,
response.text,
)
if self.logger:
await self.logger.error(
f'DingTalk createAndDeliver failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk createAndDeliver error')
if self.logger:
await self.logger.error(f'DingTalk createAndDeliver error: {traceback.format_exc()}')
return False
async def streaming_update_card(
self,
*,
out_track_id: str,
content_key: str,
content_value: str,
append: bool,
finished: bool,
failed: bool = False,
) -> bool:
"""PUT /v1.0/card/streaming.
Replaces `dingtalk_stream.AICardReplier.async_streaming` — same body
shape (outTrackId / guid / key / content / isFull / isFinalize /
isError) per the SDK source.
"""
if not await self.check_access_token():
await self.get_access_token()
body = {
'outTrackId': out_track_id,
'guid': uuid.uuid4().hex,
'key': content_key,
'content': content_value,
'isFull': not append,
'isFinalize': finished,
'isError': failed,
}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/streaming'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk card streaming failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card streaming error: {traceback.format_exc()}')
return False
async def update_card_data(
self,
*,
out_track_id: str,
card_param_map: Optional[dict] = None,
private_data: Optional[dict] = None,
) -> bool:
"""PUT /v1.0/card/instances — non-streaming card content update."""
if not await self.check_access_token():
await self.get_access_token()
body: dict = {
'outTrackId': out_track_id,
'cardData': {'cardParamMap': card_param_map or {}},
}
if private_data:
body['privateData'] = private_data
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk update_card_data request: out_track_id=%s body=%s',
out_track_id,
json.dumps(body, ensure_ascii=False)[:500],
)
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk update_card_data response: status=%d body=%s',
response.status_code,
response.text[:300],
)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk update card failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk update_card_data error')
if self.logger:
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
return False
async def delete_card(self, *, out_track_id: str) -> bool:
"""POST /v1.0/card/instances/delete — recall a delivered card.
Used to retroactively remove the initial streaming chat card when
the workflow turns out to be paused for human input — the prompt
and buttons then live entirely on the dedicated form card.
"""
if not await self.check_access_token():
await self.get_access_token()
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/delete'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
body = {'outTrackId': out_track_id, 'userIdType': 1}
try:
_stdout_logger.info('DingTalk delete_card request: out_track_id=%s', out_track_id)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk delete_card response: status=%d body=%s',
response.status_code,
response.text[:300],
)
return response.status_code == 200
except Exception:
_stdout_logger.exception('DingTalk delete_card error')
return False
async def start(self):
"""启动 WebSocket 连接,监听消息"""
self._stopped = False