diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index 955c583b..16e7fc1d 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -1567,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): return True + async def _open_new_form_card( + self, + message_id: str, + message_source: platform_events.MessageEvent, + form_data: dict, + ) -> str | None: + """Spawn a fresh card to host a re-paused human-input prompt. + + Creates a new card_id (rebinding ``self.card_id_dict[message_id]``), + replies it to the current incoming message so it appears as the next + step in the chat, registers the new reply_msg_id so subsequent button + callbacks resolve back to it, and renders the prompt + buttons on it. + + Returns the new card_id, or ``None`` if creation failed (caller is + responsible for falling back to in-place update so the workflow + remains continuable). + """ + source_message_id = getattr(message_source.message_chain, 'message_id', None) + if not source_message_id: + await self.logger.error('Cannot open new form card: source message_id missing') + return None + + try: + new_card_id = await self.create_card_id(message_id) + except Exception: + await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}') + return None + + tenant_key = ( + message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None + ) + app_access_token = self.get_app_access_token() + tenant_access_token = self.get_tenant_access_token(tenant_key) + req_opt: RequestOption = ( + RequestOption.builder() + .app_ticket(self.app_ticket) + .tenant_key(tenant_key) + .app_access_token(app_access_token) + .tenant_access_token(tenant_access_token) + .build() + ) + + content = { + 'type': 'card', + 'data': {'card_id': new_card_id, 'template_variable': {'content': ''}}, + } + request: ReplyMessageRequest = ( + ReplyMessageRequest.builder() + .message_id(str(source_message_id)) + .request_body( + ReplyMessageRequestBody.builder() + .content(json.dumps(content)) + .msg_type('interactive') + .uuid(str(uuid.uuid4())) + .build() + ) + .build() + ) + + try: + response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt) + except Exception: + await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}') + return None + + if not response.success(): + await self.logger.error( + f'Failed to send new form card: code={response.code}, msg={response.msg}, ' + f'log_id={response.get_log_id()}' + ) + return None + + reply_msg_id = getattr(response.data, 'message_id', None) + if reply_msg_id: + self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id)) + + sequence = self._next_card_sequence(new_card_id, 1) + await self._update_card_layout( + card_id=new_card_id, + message_source=message_source, + text_message='', + sequence=sequence, + form_data=form_data, + show_form_prompt=True, + ) + return new_card_id + async def reply_message( self, message_source: platform_events.MessageEvent, @@ -1702,8 +1789,15 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): form_data = getattr(bot_message, '_form_data', None) resume_from = getattr(bot_message, '_resume_from_form', False) action_title = getattr(bot_message, '_resume_action_title', '') + resume_node_title = getattr(bot_message, '_resume_node_title', '') open_new_card = getattr(bot_message, '_open_new_card', False) - selected_notice = f'> **已选择**:{action_title}' if action_title else '' + if action_title: + if resume_node_title: + selected_notice = f'**{resume_node_title}**\n已选择:{action_title}' + else: + selected_notice = f'**已选择**:{action_title}' + else: + selected_notice = '' # ── decide whether this chunk needs a card update ──────────────────── card_id = self.card_id_dict.get(message_id) @@ -1809,25 +1903,48 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): pre_pause = self.card_pre_pause_text.get(card_id, text_message) resume_cached = self.card_streaming_text.get(card_id, '') if form_data: - # Keep the current card and render the human-input action area. - # Initial pause uses the latest stream text, while re-pause after - # a button click preserves the pre-pause text and keeps the resume - # stream content in the extra placeholder area. - final_text = pre_pause if open_new_card else text_message - final_resume_placeholder = resume_cached if open_new_card else '' - await self._update_card_layout( - card_id=card_id, - message_source=message_source, - text_message=final_text, - sequence=final_seq, - form_data=form_data, - resume_placeholder_text=final_resume_placeholder, - show_form_prompt=not open_new_card, - ) if open_new_card: - self.card_streaming_text.pop(card_id, None) - self.card_pre_pause_text.pop(card_id, None) + # The old card has already been laid out into resume mode + # by the resume-transition block above (notice + resume + # placeholder). Finalise it as a frozen step snapshot and + # spawn a brand-new card to host the next human-input + # prompt — each step stays visible as its own card in the + # chat history. + new_card_id = await self._open_new_form_card(message_id, message_source, form_data) + if new_card_id is None: + # Fallback: keep the existing in-place behaviour so the + # workflow remains continuable even if creating the + # new card failed. + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=pre_pause, + sequence=final_seq, + form_data=form_data, + resume_placeholder_text=resume_cached, + show_form_prompt=True, + ) + self.card_streaming_text.pop(card_id, None) + self.card_pre_pause_text.pop(card_id, None) + else: + # The old card is now a frozen snapshot; let go of its + # streaming-side state but keep its source registrations + # intact (no _drop_card_state) so historical button + # callbacks aimed at it can still be matched if needed. + self.card_streaming_text.pop(card_id, None) + self.card_pre_pause_text.pop(card_id, None) + self.card_resume_transitioned.discard(card_id) else: + # Initial pause path: render prompt + buttons in place on + # the current card. + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=text_message, + sequence=final_seq, + form_data=form_data, + show_form_prompt=True, + ) # The human-input prompt itself is rendered as buttons only # on Lark, so do not keep the hidden fallback text around; # otherwise it will resurface after the button click. diff --git a/src/langbot/pkg/platform/sources/telegram.py b/src/langbot/pkg/platform/sources/telegram.py index 3c6c9447..d322db68 100644 --- a/src/langbot/pkg/platform/sources/telegram.py +++ b/src/langbot/pkg/platform/sources/telegram.py @@ -235,7 +235,8 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): if data.get('form_action') or data.get('f'): import langbot_plugin.api.entities.builtin.provider.session as provider_session - workflow_run_id = data.get('workflow_run_id') or data.get('w', '') + workflow_run_id = data.get('workflow_run_id', '') + w_suffix = data.get('w', '') action_id = data.get('action_id') or data.get('a', '') session_key = data.get('session_key') or data.get('s', '') @@ -267,6 +268,7 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): form_action_data = { 'workflow_run_id': workflow_run_id, + 'w_suffix': w_suffix, 'action_id': action_id, 'user': f'{launcher_type.value}_{launcher_id}', 'inputs': {}, @@ -496,6 +498,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): """Send inline keyboard buttons for Dify human_input_required form actions.""" actions = form_data.get('actions', []) node_title = form_data.get('node_title', '') + workflow_run_id = form_data.get('workflow_run_id', '') + # Telegram callback_data is capped at 64 bytes, so we identify the + # paused workflow by the last 8 chars of workflow_run_id (unique + # within a session with overwhelming probability). + w_suffix = workflow_run_id[-8:] if workflow_run_id else '' if isinstance(message_source, platform_events.GroupMessage): session_key = f'g:{message_source.group.id}' @@ -506,10 +513,10 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): for action in actions: action_id = action.get('id', '') action_title = action.get('title', action_id) - callback_data = json.dumps( - {'f': 1, 'a': action_id, 's': session_key}, - separators=(',', ':'), - ) + callback_payload = {'f': 1, 'a': action_id, 's': session_key} + if w_suffix: + callback_payload['w'] = w_suffix + callback_data = json.dumps(callback_payload, separators=(',', ':')) keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)]) reply_markup = InlineKeyboardMarkup(keyboard) diff --git a/src/langbot/pkg/provider/runners/difysvapi.py b/src/langbot/pkg/provider/runners/difysvapi.py index b871a324..14ffddec 100644 --- a/src/langbot/pkg/provider/runners/difysvapi.py +++ b/src/langbot/pkg/provider/runners/difysvapi.py @@ -6,6 +6,7 @@ import time import uuid import base64 import mimetypes +from collections import OrderedDict from langbot.pkg.provider import runner @@ -18,9 +19,10 @@ import httpx # Module-level store for paused-workflow form state, keyed by session key -# (launcher_type_value + "_" + launcher_id). Keeps state out of the SDK -# Conversation type, which may not accept arbitrary attribute assignment. -_PENDING_FORMS: dict[str, dict[str, typing.Any]] = {} +# (launcher_type_value + "_" + launcher_id). Each session holds an +# insertion-ordered dict of form_token -> form_data, allowing multiple +# Dify workflows to be paused simultaneously for the same session. +_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {} _PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap @@ -31,9 +33,13 @@ def _session_key_from_query(query: pipeline_query.Query) -> str: def _prune_pending_forms(now: float | None = None) -> None: if now is None: now = time.time() - expired = [key for key, data in _PENDING_FORMS.items() if data.get('_expires_at', 0) <= now] - for key in expired: - _PENDING_FORMS.pop(key, None) + for session_key in list(_PENDING_FORMS.keys()): + forms = _PENDING_FORMS[session_key] + expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now] + for token in expired_tokens: + forms.pop(token, None) + if not forms: + _PENDING_FORMS.pop(session_key, None) def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None: @@ -45,16 +51,67 @@ def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> Non except (TypeError, ValueError): expiration_ts = 0.0 stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL) - _PENDING_FORMS[session_key] = stored + form_token = str(stored.get('form_token') or '') + forms = _PENDING_FORMS.setdefault(session_key, OrderedDict()) + # Re-insert at the end so this becomes the "latest" entry + forms.pop(form_token, None) + forms[form_token] = stored -def _get_pending_form(session_key: str) -> dict[str, typing.Any] | None: +def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None: _prune_pending_forms() - return _PENDING_FORMS.get(session_key) + forms = _PENDING_FORMS.get(session_key) + if not forms or not form_token: + return None + return forms.get(form_token) -def _clear_pending_form(session_key: str) -> None: - _PENDING_FORMS.pop(session_key, None) +def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None: + """Look up a pending form whose workflow_run_id ends with the given suffix. + + Used by adapters (e.g. Telegram) whose callback payload is too small to + carry the full form_token / workflow_run_id. + """ + _prune_pending_forms() + forms = _PENDING_FORMS.get(session_key) + if not forms or not w_suffix: + return None + for token in reversed(forms): + form = forms[token] + if str(form.get('workflow_run_id', '')).endswith(w_suffix): + return form + return None + + +def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None: + _prune_pending_forms() + forms = _PENDING_FORMS.get(session_key) + if not forms: + return None + return forms[next(reversed(forms))] + + +def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]: + """Iterate pending forms for a session, newest-first.""" + _prune_pending_forms() + forms = _PENDING_FORMS.get(session_key) + if not forms: + return + for token in reversed(list(forms.keys())): + yield forms[token] + + +def _clear_pending_form(session_key: str, form_token: str | None = None) -> None: + """Clear one specific pending form (by token) or all forms for the session.""" + forms = _PENDING_FORMS.get(session_key) + if not forms: + return + if form_token is None: + _PENDING_FORMS.pop(session_key, None) + return + forms.pop(form_token, None) + if not forms: + _PENDING_FORMS.pop(session_key, None) @runner.runner_class('dify-service-api') @@ -406,17 +463,49 @@ class DifyServiceAPIRunner(runner.RequestRunner): content=content, ) - def _merge_pending_form_action(self, form_action: dict | None, pending_form: dict | None) -> dict | None: - """Backfill resume fields from the pending form stored on the conversation.""" + def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None: + """Locate the pending form this action targets. + + Tries identifiers in order of specificity: form_token, full + workflow_run_id, workflow_run_id suffix (Telegram-style compact id), + then falls back to the newest pending form for the session. + """ + form_token = form_action.get('form_token') + if form_token: + form = _get_pending_form_by_token(session_key, form_token) + if form: + return form + + workflow_run_id = form_action.get('workflow_run_id') + if workflow_run_id: + for form in _iter_pending_forms(session_key): + if form.get('workflow_run_id') == workflow_run_id: + return form + + w_suffix = form_action.get('w_suffix') + if w_suffix: + form = _get_pending_form_by_w_suffix(session_key, w_suffix) + if form: + return form + + return _get_latest_pending_form(session_key) + + def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None: + """Backfill resume fields from the matching pending form.""" if not form_action: return None merged_action = dict(form_action) + merged_action.pop('w_suffix', None) + pending_form = self._resolve_pending_form(session_key, form_action) if pending_form: - merged_action.setdefault('form_token', pending_form.get('form_token', '')) - merged_action.setdefault('workflow_run_id', pending_form.get('workflow_run_id', '')) + merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '') + merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get( + 'workflow_run_id', '' + ) merged_action.setdefault('inputs', pending_form.get('inputs', {})) merged_action.setdefault('user', pending_form.get('user', '')) + merged_action.setdefault('node_title', pending_form.get('node_title', '')) # Resolve clicked action's display title from the stored actions list if 'action_title' not in merged_action: @@ -428,29 +517,33 @@ class DifyServiceAPIRunner(runner.RequestRunner): return merged_action - def _match_pending_form_action(self, user_text: str, pending_form: dict | None) -> dict | None: - """Match plain text replies against pending Dify form actions.""" - if not pending_form: - return None + def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None: + """Match plain text replies against pending Dify form actions. + Iterates all pending forms newest-first; the first action whose + title/id matches the text wins. This means when multiple forms are + pending with the same button label, the most recent one resolves. + """ normalized_text = user_text.strip().lower() if not normalized_text: return None - for action in pending_form.get('actions', []): - titles = { - str(action.get('title', '')).strip().lower(), - str(action.get('id', '')).strip().lower(), - } - if normalized_text in titles: - return { - 'form_token': pending_form.get('form_token', ''), - 'workflow_run_id': pending_form.get('workflow_run_id', ''), - 'action_id': action.get('id', ''), - 'action_title': action.get('title', action.get('id', '')), - 'inputs': pending_form.get('inputs', {}), - 'user': pending_form.get('user', ''), + for pending_form in _iter_pending_forms(session_key): + for action in pending_form.get('actions', []): + titles = { + str(action.get('title', '')).strip().lower(), + str(action.get('id', '')).strip().lower(), } + if normalized_text in titles: + return { + 'form_token': pending_form.get('form_token', ''), + 'workflow_run_id': pending_form.get('workflow_run_id', ''), + 'action_id': action.get('id', ''), + 'action_title': action.get('title', action.get('id', '')), + 'node_title': pending_form.get('node_title', ''), + 'inputs': pending_form.get('inputs', {}), + 'user': pending_form.get('user', ''), + } return None @@ -460,19 +553,16 @@ class DifyServiceAPIRunner(runner.RequestRunner): """调用工作流""" # Check if this is a form action resume (button click or text match) - form_action = query.variables.get('_dify_form_action') + form_action_raw = query.variables.get('_dify_form_action') session_key = _session_key_from_query(query) - pending_form = _get_pending_form(session_key) - - if form_action: - form_action = self._merge_pending_form_action(form_action, pending_form) - _clear_pending_form(session_key) - elif pending_form: - form_action = self._match_pending_form_action(str(query.message_chain), pending_form) - if form_action: - _clear_pending_form(session_key) + + if form_action_raw: + form_action = self._merge_pending_form_action(session_key, form_action_raw) + else: + form_action = self._match_pending_form_action(session_key, str(query.message_chain)) if form_action: + _clear_pending_form(session_key, form_action.get('form_token') or None) async for msg in self._submit_workflow_form_blocking(form_action): yield msg return @@ -826,6 +916,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): user = form_action['user'] action_id = form_action.get('action_id', '') action_title = form_action.get('action_title', '') or action_id + node_title = form_action.get('node_title', '') inputs = form_action.get('inputs', {}) messsage_idx = 0 @@ -862,7 +953,10 @@ class DifyServiceAPIRunner(runner.RequestRunner): continue form_content = reason.get('form_content', '') actions = reason.get('actions', []) - node_title = reason.get('node_title', '') + # Use a distinct name — `node_title` (the just-resolved step) + # must keep its value so the resume notice on the previous + # card still shows which step the user acted on. + paused_node_title = reason.get('node_title', '') raw_inputs = reason.get('inputs', {}) _set_pending_form( @@ -872,7 +966,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): 'form_id': reason.get('form_id'), 'form_token': reason.get('form_token'), 'node_id': reason.get('node_id'), - 'node_title': node_title, + 'node_title': paused_node_title, 'form_content': form_content, 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {}, 'actions': actions, @@ -884,10 +978,17 @@ class DifyServiceAPIRunner(runner.RequestRunner): repause_form_data = { 'form_content': form_content, 'actions': actions, - 'node_title': node_title, + 'node_title': paused_node_title, 'workflow_run_id': new_run_id, 'form_token': reason.get('form_token', ''), } + # Ensure the final chunk has non-empty content so + # ResponseWrapper (which skips empty-content chunks) lets it + # propagate to SendResponseBackStage. Use a zero-width space + # so neither Lark nor Telegram renders visible noise — the + # adapter substitutes its own card text from _form_data. + if not workflow_contents: + workflow_contents = '​' is_final = True yield_this_iteration = True break @@ -922,6 +1023,8 @@ class DifyServiceAPIRunner(runner.RequestRunner): msg._resume_from_form = True if action_title: msg._resume_action_title = action_title + if node_title: + msg._resume_node_title = node_title if is_final and repause_form_data: msg._form_data = repause_form_data msg._open_new_card = True @@ -935,19 +1038,16 @@ class DifyServiceAPIRunner(runner.RequestRunner): """调用工作流""" # Check if this is a form action resume (button click or text match) - form_action = query.variables.get('_dify_form_action') + form_action_raw = query.variables.get('_dify_form_action') session_key = _session_key_from_query(query) - pending_form = _get_pending_form(session_key) - - if form_action: - form_action = self._merge_pending_form_action(form_action, pending_form) - _clear_pending_form(session_key) - elif pending_form: - form_action = self._match_pending_form_action(str(query.message_chain), pending_form) - if form_action: - _clear_pending_form(session_key) + + if form_action_raw: + form_action = self._merge_pending_form_action(session_key, form_action_raw) + else: + form_action = self._match_pending_form_action(session_key, str(query.message_chain)) if form_action: + _clear_pending_form(session_key, form_action.get('form_token') or None) # Resume paused workflow via submit endpoint async for msg in self._submit_workflow_form(form_action): yield msg