feat(dingtalk): enhance human input card functionality with streaming support and active turn management

- Updated the DingTalk card template to enable streaming mode and multi-update configuration.
- Removed the obsolete delete_card method from DingTalkClient to streamline card management.
- Enhanced DingTalkAdapter to manage active turn cards and accumulated streaming text, ensuring a seamless user experience during human input prompts.
- Modified the create_message_card method to utilize existing active cards for resumed workflows, preventing duplication.
- Improved the _paint_form_on_card method to update existing cards with human input prompts and buttons dynamically.
- Updated the dingtalk_human_input_card.json template to reflect the new streaming capabilities and configuration options.
This commit is contained in:
fdc310
2026-06-15 17:45:09 +08:00
parent 83b0d26e99
commit a32c4d152f
4 changed files with 197 additions and 94 deletions

View File

@@ -35,7 +35,7 @@ def markdown_block(node_id, variable='content'):
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'isStreaming': False,
'isStreaming': True,
'enableLinkStatPoint': False,
'linkStatPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'linkStatPointParams': [],
@@ -552,6 +552,7 @@ def build_editor_data():
return {
'schemaVersion': '3.0.0',
'schema': {
'config': {'update_multi': True, 'streaming_mode': True},
'componentsMap': components_map,
'componentsTree': [root],
'i18n': {},

View File

@@ -760,36 +760,6 @@ class DingTalkClient:
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
return False
async def delete_card(self, *, out_track_id: str) -> bool:
"""POST /v1.0/card/instances/delete — recall a delivered card.
Used to retroactively remove the initial streaming chat card when
the workflow turns out to be paused for human input — the prompt
and buttons then live entirely on the dedicated form card.
"""
if not await self.check_access_token():
await self.get_access_token()
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/delete'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
body = {'outTrackId': out_track_id, 'userIdType': 1}
try:
_stdout_logger.info('DingTalk delete_card request: out_track_id=%s', out_track_id)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk delete_card response: status=%d body=%s',
response.status_code,
response.text[:300],
)
return response.status_code == 200
except Exception:
_stdout_logger.exception('DingTalk delete_card error')
return False
async def start(self):
"""启动 WebSocket 连接,监听消息"""
self._stopped = False

View File

@@ -181,6 +181,15 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
# user_id_hint, current_text}. Lookup keys for the data-source pull endpoint and
# the STREAM card-action callback.
card_state: dict
# session_key → out_track_id of the currently-active card for the
# conversation turn. Lets resumed-workflow chunks (which arrive on a
# synthetic event with a fresh resp_message_id) keep updating the same
# card the user clicked instead of getting a new one.
active_turn_card: dict
# session_key → accumulated streaming text for the active turn. Read
# by _paint_form_on_card so the post-pause form keeps the streamed
# context above the new prompt.
active_turn_text: dict
ap: typing.Any = None
bot_uuid: str = ''
@@ -208,6 +217,8 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
logger=logger,
card_instance_id_dict={},
card_state={},
active_turn_card={},
active_turn_text={},
bot_account_id=bot_account_id,
bot=bot,
listeners={},
@@ -281,11 +292,14 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_instance, card_instance_id = chat_card_entry
if content:
if form_template_id:
# The form template's MarkdownBlock has `isStreaming: false`
# — the streaming endpoint (PUT /v1.0/card/streaming) does
# not propagate to non-streaming components. Use the full
# update_card_data PUT instead so the content actually
# appears in the card body.
# The card content has already been written via
# update_card_data (in _paint_form_on_card and the
# initial card creation). The streaming endpoint
# (PUT /v1.0/card/streaming) does not propagate
# updates on cards whose content was last set via
# update_card_data — they take different code paths
# on the DingTalk client. Stick with update_card_data
# for the whole turn for consistency.
try:
await self.bot.update_card_data(
out_track_id=card_instance_id,
@@ -329,30 +343,54 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return is_stream
async def create_message_card(self, message_id, event):
# When a form template is configured, every card in the conversation
# uses it (chat output, form prompts, post-click states). The chat
# template fallback only kicks in if no form template is configured.
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = self.config.get('card_template_id', '')
# Synthetic events (e.g. card button clicks) have no inbound chatbot
# message — skip card creation. The lazy-create path in
# reply_message_chunk will spawn a fresh card when the first
# non-empty resume chunk arrives.
# Synthetic events (button clicks): look up the card already in
# active_turn_card so reply_message_chunk can stream to it.
if event is None or event.source_platform_object is None:
if form_template_id:
session_key = self._session_key_from_event(event) if event is not None else ''
carry = self.active_turn_card.get(session_key, '') if session_key else ''
if carry:
self.card_instance_id_dict[message_id] = (None, carry)
return True
return False
if form_template_id:
# Defer card creation to the first non-empty chunk. If the Dify
# workflow pauses immediately for human input without producing
# any LLM text first, no chat card is created at all — only the
# form card gets delivered. Lazy-create lives in
# reply_message_chunk → _lazy_create_resume_chat_card.
return False
# Create one card with the form template, empty buttons,
# pending state. Streaming writes content to it; form pause
# paints buttons onto it. One card per turn, no duplication.
incoming_message = event.source_platform_object.incoming_message
out_track_id = uuid.uuid4().hex
is_group = str(incoming_message.conversation_type) == '2'
if is_group:
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
else:
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
try:
await self.bot.create_and_deliver_card(
card_template_id=form_template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map={'content': '', 'btns': '[]', 'flowStatus': '1'},
callback_type='STREAM',
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: create form-template card failed')
return False
self.card_instance_id_dict[message_id] = (None, out_track_id)
session_key = self._session_key_from_event(event)
if session_key:
self.active_turn_card[session_key] = out_track_id
self.active_turn_text[session_key] = ''
return True
# Legacy chat-card path (no form template configured).
# Legacy chat-card path (no form template).
incoming_message = event.source_platform_object.incoming_message
card_auto_layout = self.config.get('card_ auto_layout', False)
card_auto_layout = self.config.get('card_auto_layout', False)
card_instance, card_instance_id = await self.bot.create_and_card(
legacy_template_id, incoming_message, card_auto_layout=card_auto_layout
)
@@ -460,12 +498,12 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
message: platform_message.MessageChain,
form_data: dict,
) -> None:
"""Finalize the current chat card and deliver a new form card.
"""Surface human-input prompt + buttons on the active card.
Multi-card flow: every Dify pause spawns its own card. The card the
chat was streaming into (if any) is closed out via streaming_update
with finished=True so its spinner stops; a fresh card is then
delivered carrying the prompt + buttons.
In single-card mode (form_template_id configured): update the
EXISTING card with form buttons so it transitions from streaming
output to prompt+buttons on the same card. In legacy mode:
finalize the chat card and deliver a separate form card.
"""
if self.ap is not None:
self.ap.logger.info(
@@ -475,9 +513,25 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
message_id = bot_message.resp_message_id
template_id = (self.config.get('human_input_card_template_id') or '').strip()
# Finalize the previous chat card so its spinner stops. Use the
# already-streamed text as the final content (or zero-width space
# when nothing streamed, to satisfy any non-empty-content guards).
if template_id:
# Single-card mode: paint prompt + buttons onto the existing card.
session_key = self._session_key_from_event(message_source)
entry = self.card_instance_id_dict.get(message_id)
out_track_id = entry[1] if entry else None
if not out_track_id and session_key:
out_track_id = self.active_turn_card.get(session_key, '')
if out_track_id:
await self._paint_form_on_card(message_source, out_track_id, form_data, session_key)
self.card_instance_id_dict.pop(message_id, None)
return
# No existing card (e.g. Dify paused immediately with no LLM
# output before the pause). Create a form card directly.
await self._send_form_card(message_source, form_data, template_id)
self.card_instance_id_dict.pop(message_id, None)
return
# Legacy mode: finalize the streaming card with text fallback.
chat_card_entry = self.card_instance_id_dict.pop(message_id, None)
if chat_card_entry is not None:
_, chat_out_track_id = chat_card_entry
@@ -489,24 +543,93 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
await self.bot.send_card_message(None, chat_out_track_id, text_content or '', True)
except Exception:
await self.logger.error(f'DingTalk: finalize chat card before form failed: {traceback.format_exc()}')
# When the chat card uses the form template, also flip flowStatus
# to 3 so it leaves the pending state visibly.
if template_id:
try:
await self.bot.update_card_data(
out_track_id=chat_out_track_id,
card_param_map={'flowStatus': '3'},
)
except Exception:
pass
if not template_id:
# No form template configured — fall back to plain text so users
# can still reply with the option number or title.
await self.send_message_text_form(message_source, form_data)
async def _paint_form_on_card(
self,
message_source: platform_events.MessageEvent,
out_track_id: str,
form_data: dict,
session_key: str,
) -> None:
"""Update an existing card's content + buttons for human-input."""
actions = list(form_data.get('actions') or [])
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '') or ''
# Record form state for the click-handler.
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
self.card_state[out_track_id] = {
'session_key': session_key,
'launcher_type': launcher_type.value,
'launcher_id': launcher_id,
'sender_user_id': sender_user_id,
'form_token': form_data.get('form_token', ''),
'workflow_run_id': form_data.get('workflow_run_id', ''),
'actions': actions,
'node_title': node_title,
'form_content': form_content,
}
btns = self._build_btns(actions, out_track_id)
parts: list[str] = []
prior = self.active_turn_text.get(session_key, '') if session_key else ''
if prior.strip():
parts.append(prior.rstrip())
parts.append('---')
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
try:
await self.bot.update_card_data(
out_track_id=out_track_id,
card_param_map={
'content': display_content,
'btns': json.dumps(btns, ensure_ascii=False),
'flowStatus': '3',
},
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: paint form on card failed')
await self.send_message_text_form(message_source, form_data)
return
await self._send_form_card(message_source, form_data, template_id)
if session_key:
self.active_turn_text[session_key] = display_content
@staticmethod
def _build_btns(actions: list, out_track_id: str) -> list:
btns = []
for idx, action in enumerate(actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style = (action.get('button_style') or '').lower()
if style == 'primary' or (style == '' and idx == 0):
color = 'blue'
elif style == 'danger':
color = 'red'
else:
color = 'gray'
btns.append(
{
'text': title,
'color': color,
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {
'actionId': action_id,
'params': {'action_id': action_id, 'out_track_id': out_track_id},
},
},
}
)
return btns
async def _send_form_card(
self,
@@ -601,11 +724,9 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
) -> typing.Optional[tuple]:
"""Create a new card for resumed-workflow streaming output.
Used after a button click triggers a synthetic event — no inbound
chatbot message means no card was created upstream, so we spin one
up here when the first non-empty chunk arrives. Prefers the form
template (so empty `btns` keep the layout consistent across the
whole conversation); falls back to the legacy chat template.
Used after a button click triggers a synthetic event — the form
card stays put with the "已选择" notice, and a fresh card is
spawned here for the LLM reply to stream into.
"""
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = (self.config.get('card_template_id') or '').strip()
@@ -614,12 +735,6 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return None
out_track_id = uuid.uuid4().hex
open_space_id, is_group = self._derive_open_space(message_source)
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _lazy_create_resume_chat_card: out_track_id={out_track_id} '
f'open_space_id={open_space_id} is_group={is_group} '
f'using_form_template={bool(form_template_id)}'
)
if form_template_id:
card_param_map = {'content': '', 'btns': '[]', 'flowStatus': '1'}
card_data_config = None
@@ -644,6 +759,12 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return None
entry = (None, out_track_id)
self.card_instance_id_dict[message_id] = entry
# Register as the active card so any further chunks on this turn
# (and a subsequent re-pause) land on the same new card.
session_key = self._session_key_from_event(message_source)
if session_key:
self.active_turn_card[session_key] = out_track_id
self.active_turn_text[session_key] = ''
return entry
async def send_message_text_form(
@@ -813,8 +934,11 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.ap.logger.exception('DingTalk: enqueue form action query failed')
return
# Visual feedback: collapse the form card to a "已选择" notice so
# the user knows the click registered while the workflow resumes.
# Visual feedback on the form card itself: keep the prompt visible,
# add a "已选择" line, remove the buttons. The resumed-workflow
# output lives on a separate new card (lazy-created in
# reply_message_chunk on the synthetic event), so the form card
# stays put as a record of the user's selection.
asyncio.create_task(
self._mark_card_resolved(
out_track_id,
@@ -824,6 +948,16 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
)
)
# Crucial: do NOT leave the form card's out_track_id in
# active_turn_card — otherwise create_message_card for the
# synthetic event would reuse it for the resume output, painting
# the LLM reply on top of the "已选择" notice. Clear it so the
# resume goes through the lazy-create path and spawns a fresh card.
session_key = state.get('session_key', '')
if session_key and self.active_turn_card.get(session_key) == out_track_id:
self.active_turn_card.pop(session_key, None)
self.active_turn_text.pop(session_key, None)
# Once consumed, drop the state — the runner clears _PENDING_FORMS too.
self.card_state.pop(out_track_id, None)
@@ -837,12 +971,9 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
) -> None:
"""Update the form card to acknowledge the user's selection.
We rewrite the card content with the original prompt + a green tick
marker, and explicitly clear ``btns`` so the buttons are removed
once chosen. ``flowStatus`` is re-sent because some DingTalk clients
treat the PUT update as a partial *replace* of cardParamMap rather
than a merge — without it, the AICardContainer status containers
would all gate to ``gone`` and the whole card would blank out.
Keeps the original prompt visible, adds a "已选择: X" notice, and
clears the buttons. The card stays as a permanent record of the
choice; the resumed workflow's output goes to a separate new card.
"""
parts: list[str] = []
if node_title:
@@ -863,4 +994,5 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
},
)
except Exception:
await self.logger.error(f'DingTalk: update form card after click failed: {traceback.format_exc()}')
if self.ap is not None:
self.ap.logger.exception('DingTalk: mark card resolved failed')

File diff suppressed because one or more lines are too long