feat: Enhance Lark and Telegram adapters with new form handling for paused workflows

This commit is contained in:
fdc310
2026-06-01 23:48:59 +08:00
parent 60e5b873ee
commit f663d87a60
3 changed files with 302 additions and 78 deletions

View File

@@ -1567,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return True
async def _open_new_form_card(
self,
message_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> str | None:
"""Spawn a fresh card to host a re-paused human-input prompt.
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
replies it to the current incoming message so it appears as the next
step in the chat, registers the new reply_msg_id so subsequent button
callbacks resolve back to it, and renders the prompt + buttons on it.
Returns the new card_id, or ``None`` if creation failed (caller is
responsible for falling back to in-place update so the workflow
remains continuable).
"""
source_message_id = getattr(message_source.message_chain, 'message_id', None)
if not source_message_id:
await self.logger.error('Cannot open new form card: source message_id missing')
return None
try:
new_card_id = await self.create_card_id(message_id)
except Exception:
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
return None
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
content = {
'type': 'card',
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(str(source_message_id))
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(content))
.msg_type('interactive')
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
try:
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
except Exception:
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
return None
if not response.success():
await self.logger.error(
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}'
)
return None
reply_msg_id = getattr(response.data, 'message_id', None)
if reply_msg_id:
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
sequence = self._next_card_sequence(new_card_id, 1)
await self._update_card_layout(
card_id=new_card_id,
message_source=message_source,
text_message='',
sequence=sequence,
form_data=form_data,
show_form_prompt=True,
)
return new_card_id
async def reply_message(
self,
message_source: platform_events.MessageEvent,
@@ -1702,8 +1789,15 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
form_data = getattr(bot_message, '_form_data', None)
resume_from = getattr(bot_message, '_resume_from_form', False)
action_title = getattr(bot_message, '_resume_action_title', '')
resume_node_title = getattr(bot_message, '_resume_node_title', '')
open_new_card = getattr(bot_message, '_open_new_card', False)
selected_notice = f'> **已选择**{action_title}' if action_title else ''
if action_title:
if resume_node_title:
selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
else:
selected_notice = f'**已选择**{action_title}'
else:
selected_notice = ''
# ── decide whether this chunk needs a card update ────────────────────
card_id = self.card_id_dict.get(message_id)
@@ -1809,25 +1903,48 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
resume_cached = self.card_streaming_text.get(card_id, '')
if form_data:
# Keep the current card and render the human-input action area.
# Initial pause uses the latest stream text, while re-pause after
# a button click preserves the pre-pause text and keeps the resume
# stream content in the extra placeholder area.
final_text = pre_pause if open_new_card else text_message
final_resume_placeholder = resume_cached if open_new_card else ''
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=final_text,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=final_resume_placeholder,
show_form_prompt=not open_new_card,
)
if open_new_card:
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
# The old card has already been laid out into resume mode
# by the resume-transition block above (notice + resume
# placeholder). Finalise it as a frozen step snapshot and
# spawn a brand-new card to host the next human-input
# prompt — each step stays visible as its own card in the
# chat history.
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
if new_card_id is None:
# Fallback: keep the existing in-place behaviour so the
# workflow remains continuable even if creating the
# new card failed.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=resume_cached,
show_form_prompt=True,
)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
else:
# The old card is now a frozen snapshot; let go of its
# streaming-side state but keep its source registrations
# intact (no _drop_card_state) so historical button
# callbacks aimed at it can still be matched if needed.
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
else:
# Initial pause path: render prompt + buttons in place on
# the current card.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=final_seq,
form_data=form_data,
show_form_prompt=True,
)
# The human-input prompt itself is rendered as buttons only
# on Lark, so do not keep the hidden fallback text around;
# otherwise it will resurface after the button click.

View File

@@ -235,7 +235,8 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
if data.get('form_action') or data.get('f'):
import langbot_plugin.api.entities.builtin.provider.session as provider_session
workflow_run_id = data.get('workflow_run_id') or data.get('w', '')
workflow_run_id = data.get('workflow_run_id', '')
w_suffix = data.get('w', '')
action_id = data.get('action_id') or data.get('a', '')
session_key = data.get('session_key') or data.get('s', '')
@@ -267,6 +268,7 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
form_action_data = {
'workflow_run_id': workflow_run_id,
'w_suffix': w_suffix,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
@@ -496,6 +498,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""Send inline keyboard buttons for Dify human_input_required form actions."""
actions = form_data.get('actions', [])
node_title = form_data.get('node_title', '')
workflow_run_id = form_data.get('workflow_run_id', '')
# Telegram callback_data is capped at 64 bytes, so we identify the
# paused workflow by the last 8 chars of workflow_run_id (unique
# within a session with overwhelming probability).
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'g:{message_source.group.id}'
@@ -506,10 +513,10 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
callback_data = json.dumps(
{'f': 1, 'a': action_id, 's': session_key},
separators=(',', ':'),
)
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
if w_suffix:
callback_payload['w'] = w_suffix
callback_data = json.dumps(callback_payload, separators=(',', ':'))
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
reply_markup = InlineKeyboardMarkup(keyboard)

View File

@@ -6,6 +6,7 @@ import time
import uuid
import base64
import mimetypes
from collections import OrderedDict
from langbot.pkg.provider import runner
@@ -18,9 +19,10 @@ import httpx
# Module-level store for paused-workflow form state, keyed by session key
# (launcher_type_value + "_" + launcher_id). Keeps state out of the SDK
# Conversation type, which may not accept arbitrary attribute assignment.
_PENDING_FORMS: dict[str, dict[str, typing.Any]] = {}
# (launcher_type_value + "_" + launcher_id). Each session holds an
# insertion-ordered dict of form_token -> form_data, allowing multiple
# Dify workflows to be paused simultaneously for the same session.
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
@@ -31,9 +33,13 @@ def _session_key_from_query(query: pipeline_query.Query) -> str:
def _prune_pending_forms(now: float | None = None) -> None:
if now is None:
now = time.time()
expired = [key for key, data in _PENDING_FORMS.items() if data.get('_expires_at', 0) <= now]
for key in expired:
_PENDING_FORMS.pop(key, None)
for session_key in list(_PENDING_FORMS.keys()):
forms = _PENDING_FORMS[session_key]
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
for token in expired_tokens:
forms.pop(token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
@@ -45,16 +51,67 @@ def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> Non
except (TypeError, ValueError):
expiration_ts = 0.0
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
_PENDING_FORMS[session_key] = stored
form_token = str(stored.get('form_token') or '')
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
# Re-insert at the end so this becomes the "latest" entry
forms.pop(form_token, None)
forms[form_token] = stored
def _get_pending_form(session_key: str) -> dict[str, typing.Any] | None:
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
return _PENDING_FORMS.get(session_key)
forms = _PENDING_FORMS.get(session_key)
if not forms or not form_token:
return None
return forms.get(form_token)
def _clear_pending_form(session_key: str) -> None:
_PENDING_FORMS.pop(session_key, None)
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
"""Look up a pending form whose workflow_run_id ends with the given suffix.
Used by adapters (e.g. Telegram) whose callback payload is too small to
carry the full form_token / workflow_run_id.
"""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not w_suffix:
return None
for token in reversed(forms):
form = forms[token]
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
return form
return None
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return None
return forms[next(reversed(forms))]
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
"""Iterate pending forms for a session, newest-first."""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
for token in reversed(list(forms.keys())):
yield forms[token]
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
"""Clear one specific pending form (by token) or all forms for the session."""
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
if form_token is None:
_PENDING_FORMS.pop(session_key, None)
return
forms.pop(form_token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
@runner.runner_class('dify-service-api')
@@ -406,17 +463,49 @@ class DifyServiceAPIRunner(runner.RequestRunner):
content=content,
)
def _merge_pending_form_action(self, form_action: dict | None, pending_form: dict | None) -> dict | None:
"""Backfill resume fields from the pending form stored on the conversation."""
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
"""Locate the pending form this action targets.
Tries identifiers in order of specificity: form_token, full
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
then falls back to the newest pending form for the session.
"""
form_token = form_action.get('form_token')
if form_token:
form = _get_pending_form_by_token(session_key, form_token)
if form:
return form
workflow_run_id = form_action.get('workflow_run_id')
if workflow_run_id:
for form in _iter_pending_forms(session_key):
if form.get('workflow_run_id') == workflow_run_id:
return form
w_suffix = form_action.get('w_suffix')
if w_suffix:
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
if form:
return form
return _get_latest_pending_form(session_key)
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
"""Backfill resume fields from the matching pending form."""
if not form_action:
return None
merged_action = dict(form_action)
merged_action.pop('w_suffix', None)
pending_form = self._resolve_pending_form(session_key, form_action)
if pending_form:
merged_action.setdefault('form_token', pending_form.get('form_token', ''))
merged_action.setdefault('workflow_run_id', pending_form.get('workflow_run_id', ''))
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
'workflow_run_id', ''
)
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
merged_action.setdefault('user', pending_form.get('user', ''))
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
# Resolve clicked action's display title from the stored actions list
if 'action_title' not in merged_action:
@@ -428,29 +517,33 @@ class DifyServiceAPIRunner(runner.RequestRunner):
return merged_action
def _match_pending_form_action(self, user_text: str, pending_form: dict | None) -> dict | None:
"""Match plain text replies against pending Dify form actions."""
if not pending_form:
return None
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
"""Match plain text replies against pending Dify form actions.
Iterates all pending forms newest-first; the first action whose
title/id matches the text wins. This means when multiple forms are
pending with the same button label, the most recent one resolves.
"""
normalized_text = user_text.strip().lower()
if not normalized_text:
return None
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
for pending_form in _iter_pending_forms(session_key):
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'node_title': pending_form.get('node_title', ''),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
}
return None
@@ -460,19 +553,16 @@ class DifyServiceAPIRunner(runner.RequestRunner):
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action = query.variables.get('_dify_form_action')
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
pending_form = _get_pending_form(session_key)
if form_action:
form_action = self._merge_pending_form_action(form_action, pending_form)
_clear_pending_form(session_key)
elif pending_form:
form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
if form_action:
_clear_pending_form(session_key)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
@@ -826,6 +916,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
user = form_action['user']
action_id = form_action.get('action_id', '')
action_title = form_action.get('action_title', '') or action_id
node_title = form_action.get('node_title', '')
inputs = form_action.get('inputs', {})
messsage_idx = 0
@@ -862,7 +953,10 @@ class DifyServiceAPIRunner(runner.RequestRunner):
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
# Use a distinct name — `node_title` (the just-resolved step)
# must keep its value so the resume notice on the previous
# card still shows which step the user acted on.
paused_node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
@@ -872,7 +966,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'node_title': paused_node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
@@ -884,10 +978,17 @@ class DifyServiceAPIRunner(runner.RequestRunner):
repause_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'node_title': paused_node_title,
'workflow_run_id': new_run_id,
'form_token': reason.get('form_token', ''),
}
# Ensure the final chunk has non-empty content so
# ResponseWrapper (which skips empty-content chunks) lets it
# propagate to SendResponseBackStage. Use a zero-width space
# so neither Lark nor Telegram renders visible noise — the
# adapter substitutes its own card text from _form_data.
if not workflow_contents:
workflow_contents = ''
is_final = True
yield_this_iteration = True
break
@@ -922,6 +1023,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
msg._resume_from_form = True
if action_title:
msg._resume_action_title = action_title
if node_title:
msg._resume_node_title = node_title
if is_final and repause_form_data:
msg._form_data = repause_form_data
msg._open_new_card = True
@@ -935,19 +1038,16 @@ class DifyServiceAPIRunner(runner.RequestRunner):
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action = query.variables.get('_dify_form_action')
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
pending_form = _get_pending_form(session_key)
if form_action:
form_action = self._merge_pending_form_action(form_action, pending_form)
_clear_pending_form(session_key)
elif pending_form:
form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
if form_action:
_clear_pending_form(session_key)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
# Resume paused workflow via submit endpoint
async for msg in self._submit_workflow_form(form_action):
yield msg