From b96f209b9850563a3503149d92f56d45f620bcaa Mon Sep 17 00:00:00 2001 From: fdc310 <2213070223@qq.com> Date: Thu, 28 May 2026 23:32:46 +0800 Subject: [PATCH] feat: Implement workflow form handling for paused workflows - Added module-level storage for pending forms to manage state across sessions. - Introduced functions to set, get, and clear pending forms with expiration handling. - Enhanced DifyServiceAPIRunner to support resuming paused workflows via form actions. - Implemented logic to yield human input requests and display appropriate messages. - Updated workflow submission methods to handle paused states and resume actions. - Ensured proper merging of pending form actions with user inputs for seamless interaction. --- .../libs/dify_service_api/v1/client.py | 55 ++ src/langbot/pkg/pipeline/pipelinemgr.py | 2 +- src/langbot/pkg/pipeline/pool.py | 6 +- src/langbot/pkg/pipeline/respback/respback.py | 2 +- src/langbot/pkg/platform/botmgr.py | 2 + src/langbot/pkg/platform/sources/lark.py | 716 ++++++++++++++++-- src/langbot/pkg/platform/sources/telegram.py | 145 +++- src/langbot/pkg/provider/runners/difysvapi.py | 396 +++++++++- 8 files changed, 1251 insertions(+), 73 deletions(-) diff --git a/src/langbot/libs/dify_service_api/v1/client.py b/src/langbot/libs/dify_service_api/v1/client.py index cc8008e1..a608a0db 100644 --- a/src/langbot/libs/dify_service_api/v1/client.py +++ b/src/langbot/libs/dify_service_api/v1/client.py @@ -109,6 +109,61 @@ class AsyncDifyServiceClient: if chunk.startswith('data:'): yield json.loads(chunk[5:]) + async def workflow_submit( + self, + form_token: str, + workflow_run_id: str, + inputs: dict[str, typing.Any], + user: str, + action: str = '', + timeout: float = 120.0, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """Submit human input to resume a paused workflow, then stream events. + + 1. POST /form/human_input/{form_token} to submit the form + 2. GET /workflow/{task_id}/events to stream the resumed workflow events + """ + + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json', + } + + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + # Step 1: Submit the form + payload: dict[str, typing.Any] = { + 'inputs': inputs if isinstance(inputs, dict) else {}, + 'user': user, + 'action': action, + } + + submit_resp = await client.post( + f'/form/human_input/{form_token}', + headers=headers, + json=payload, + ) + if submit_resp.status_code != 200: + raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}') + + # Step 2: Stream resumed workflow events + async with client.stream( + 'GET', + f'/workflow/{workflow_run_id}/events', + headers={'Authorization': f'Bearer {self.api_key}'}, + params={'user': user}, + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise DifyAPIError(f'{r.status_code} {chunk}') + if chunk.strip() == '': + continue + if chunk.startswith('data:'): + yield json.loads(chunk[5:]) + async def upload_file( self, file: httpx._types.FileTypes, diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 1426fe3d..6f917b2d 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -157,7 +157,7 @@ class RuntimePipeline: bot_message=query.resp_messages[-1], message=result.user_notice, quote_origin=query.pipeline_config['output']['misc']['quote-origin'], - is_final=[msg.is_final for msg in query.resp_messages][0], + is_final=[msg.is_final for msg in query.resp_messages][-1], ) else: await query.adapter.reply_message( diff --git a/src/langbot/pkg/pipeline/pool.py b/src/langbot/pkg/pipeline/pool.py index d2d4563b..55ce7fe1 100644 --- a/src/langbot/pkg/pipeline/pool.py +++ b/src/langbot/pkg/pipeline/pool.py @@ -42,9 +42,13 @@ class QueryPool: adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, pipeline_uuid: typing.Optional[str] = None, routed_by_rule: bool = False, + variables: typing.Optional[dict[str, typing.Any]] = None, ) -> pipeline_query.Query: async with self.condition: query_id = self.query_id_counter + initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule} + if variables: + initial_variables.update(variables) query = pipeline_query.Query( bot_uuid=bot_uuid, query_id=query_id, @@ -53,7 +57,7 @@ class QueryPool: sender_id=sender_id, message_event=message_event, message_chain=message_chain, - variables={'_routed_by_rule': routed_by_rule}, + variables=initial_variables, resp_messages=[], resp_message_chain=[], adapter=adapter, diff --git a/src/langbot/pkg/pipeline/respback/respback.py b/src/langbot/pkg/pipeline/respback/respback.py index 574404bc..39cba04a 100644 --- a/src/langbot/pkg/pipeline/respback/respback.py +++ b/src/langbot/pkg/pipeline/respback/respback.py @@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage): has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages) # TODO 命令与流式的兼容性问题 if await query.adapter.is_stream_output_supported() and has_chunks: - is_final = [msg.is_final for msg in query.resp_messages][0] + is_final = [msg.is_final for msg in query.resp_messages][-1] await query.adapter.reply_message_chunk( message_source=query.message_event, bot_message=query.resp_messages[-1], diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 8e99618c..6e995206 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -501,6 +501,8 @@ class PlatformManager: bot_entity.adapter_config, logger, ) + if hasattr(adapter_inst, 'ap'): + adapter_inst.ap = self.ap # 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook) if hasattr(adapter_inst, 'set_bot_uuid'): diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index f0938f56..7ed0b8c9 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger +import langbot_plugin.api.entities.builtin.provider.session as provider_session class AESCipher(object): @@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟 class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot: lark_oapi.ws.Client = pydantic.Field(exclude=True) api_client: lark_oapi.Client = pydantic.Field(exclude=True) + ap: typing.Any = pydantic.Field(exclude=True, default=None) bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识 lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key @@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): pending_monitoring_msg: dict[str, str] # Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks) reply_to_monitoring_msg: dict[str, tuple[str, float]] + reply_message_card_ids: dict[str, str] + card_sequence_dict: dict[str, int] + # card_id → set of source message ids registered against it (for cleanup) + card_id_to_source_ids: dict[str, set[str]] + # card_id → current streaming_txt content cache (needed for full aupdate during resume transition) + card_streaming_text: dict[str, str] + # card_id → pre-pause streaming_txt text (captured when resume first chunk arrives) + card_pre_pause_text: dict[str, str] + # set of card_ids that have already transitioned from "buttons visible" to "resume layout" + card_resume_transitioned: set[str] _MONITORING_MAPPING_TTL = 600 # 10 minutes seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识 @@ -812,11 +824,131 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): asyncio.create_task(on_message(event)) + def schedule_on_app_loop(coro): + """Run a coroutine on the application event loop from sync callbacks.""" + return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop) + def sync_on_card_action(event): try: - action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {}) + action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {}) + # Parse JSON string values (from form action buttons) + if isinstance(action_value_raw, str): + try: + action_value_obj = json.loads(action_value_raw) + except (json.JSONDecodeError, TypeError): + action_value_obj = {} + else: + action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {} action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else '' + # Handle Dify form action button clicks + if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'): + form_token = action_value_obj.get('form_token', '') + workflow_run_id = action_value_obj.get('workflow_run_id', '') + action_id = action_value_obj.get('action_id', '') + session_key = action_value_obj.get('session_key', '') + + if session_key.startswith('group_') or session_key.startswith('g:'): + launcher_type = provider_session.LauncherTypes.GROUP + launcher_id = ( + session_key.split(':', 1)[1] + if session_key.startswith('g:') + else session_key[len('group_') :] + ) + else: + launcher_type = provider_session.LauncherTypes.PERSON + launcher_id = ( + session_key.split(':', 1)[1] + if session_key.startswith('p:') + else session_key[len('person_') :] + ) + + # Find the bot entity to get bot_uuid and pipeline_uuid + bot_uuid = '' + pipeline_uuid = None + for bot in self.ap.platform_mgr.bots: + if bot.adapter is self: + bot_uuid = bot.bot_entity.uuid + pipeline_uuid = bot.bot_entity.use_pipeline_uuid + break + + form_action_data = { + 'form_token': form_token, + 'workflow_run_id': workflow_run_id, + 'action_id': action_id, + 'user': f'{launcher_type.value}_{launcher_id}', + 'inputs': {}, + } + + context = getattr(event.event, 'context', None) + open_message_id = getattr(context, 'open_message_id', None) + source_time = datetime.datetime.now() + event_time = source_time.timestamp() + action_text = action_value_obj.get('action_id', 'confirm') + message_chain = platform_message.MessageChain( + [platform_message.Plain(text=f'[Form Action: {action_text}]')] + ) + if open_message_id: + message_chain.insert( + 0, + platform_message.Source( + id=open_message_id, + time=source_time, + ), + ) + + operator = getattr(event.event, 'operator', None) + user_id = ( + getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id) + ) + + if launcher_type == provider_session.LauncherTypes.GROUP: + synthetic_event = platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=user_id, + member_name='', + permission=platform_entities.Permission.Member, + group=platform_entities.Group( + id=launcher_id, + name='', + permission=platform_entities.Permission.Member, + ), + ), + message_chain=message_chain, + time=event_time, + source_platform_object=event, + ) + else: + synthetic_event = platform_events.FriendMessage( + sender=platform_entities.Friend( + id=user_id, + nickname='', + remark='', + ), + message_chain=message_chain, + time=event_time, + source_platform_object=event, + ) + + async def add_form_action_query(): + await self.ap.query_pool.add_query( + bot_uuid=bot_uuid, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=user_id, + message_event=synthetic_event, + message_chain=message_chain, + adapter=self, + pipeline_uuid=pipeline_uuid, + variables={'_dify_form_action': form_action_data}, + ) + + schedule_on_app_loop(add_form_action_query()) + + from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + + return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}}) + if action_value == '有帮助': feedback_type = 1 elif action_value == '无帮助': @@ -857,17 +989,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): ) if platform_events.FeedbackEvent in self.listeners: - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) - else: - loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) + schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}}) except Exception: - asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}')) + traceback.print_exc() + schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}')) from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}}) @@ -893,6 +1022,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): card_id_dict={}, pending_monitoring_msg={}, reply_to_monitoring_msg={}, + reply_message_card_ids={}, + card_sequence_dict={}, + card_id_to_source_ids={}, + card_streaming_text={}, + card_pre_pause_text={}, + card_resume_transitioned=set(), seq=1, listeners={}, quart_app=quart_app, @@ -1132,6 +1267,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): for k in expired: del self.reply_to_monitoring_msg[k] + def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int: + """Return the next strictly increasing sequence for a card update.""" + current = self.card_sequence_dict.get(card_id, 0) + next_seq = max(current + 1, suggested) + self.card_sequence_dict[card_id] = next_seq + return next_seq + + def _register_card_for_source(self, card_id: str, *source_ids: str) -> None: + """Register a card_id under one or more source message ids.""" + bucket = self.card_id_to_source_ids.setdefault(card_id, set()) + for sid in source_ids: + if not sid: + continue + self.reply_message_card_ids[sid] = card_id + bucket.add(sid) + + def _drop_card_state(self, card_id: str) -> None: + """Pop all per-card state for the given card_id.""" + if not card_id: + return + for sid in self.card_id_to_source_ids.pop(card_id, set()): + self.reply_message_card_ids.pop(sid, None) + self.card_sequence_dict.pop(card_id, None) + self.card_streaming_text.pop(card_id, None) + self.card_pre_pause_text.pop(card_id, None) + self.card_resume_transitioned.discard(card_id) + async def create_card_id(self, message_id): try: # self.logger.debug('飞书支持stream输出,创建卡片......') @@ -1327,6 +1489,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): self.card_id_dict[message_id] = response.data.card_id card_id = response.data.card_id + self.card_sequence_dict[card_id] = 0 return card_id except Exception as e: @@ -1339,6 +1502,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): """ # message_id = event.message_chain.message_id + source_message_id = str(event.message_chain.message_id) + existing_card_id = self.reply_message_card_ids.get(source_message_id) + if existing_card_id: + self.card_id_dict[message_id] = existing_card_id + return True + card_id = await self.create_card_id(message_id) content = { 'type': 'card', @@ -1377,6 +1546,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): user_msg_id = event.message_chain.message_id reply_msg_id = getattr(response.data, 'message_id', None) monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None) + # Register the card under both the user-incoming msg id (so a + # second reply_message_first_chunk for the same user message + # reuses this card) AND the bot-reply msg id (so a synthetic + # event from a form-button callback — whose Source.id equals + # the bot's card message id — hits the same card and renders + # the resume content into it). + if reply_msg_id: + self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id)) + else: + self._register_card_for_source(card_id, str(user_msg_id)) if reply_msg_id and monitoring_msg_id: self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time()) self._cleanup_monitoring_mapping() @@ -1504,45 +1683,462 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): ): """ 回复消息变成更新卡片消息 + + Supports Dify form-action resume: when the runner yields a chunk with + ``_resume_from_form=True``, the card transitions from buttons to a + grey "已选择" notice and a new ``streaming_txt_resume`` element is added + for subsequent resume chunks to stream into. + + When ``_open_new_card=True`` on the final chunk, the existing card is + left as-is and the pipeline will create a new card (with fresh form + buttons) for the re-pause. """ - # self.seq += 1 message_id = bot_message.resp_message_id msg_seq = bot_message.msg_sequence - if msg_seq % 8 == 0 or is_final: - text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client) - text_message = '' - if text_elements: - parts = [] - for paragraph in text_elements: - para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md')) - if para_text: - parts.append(para_text) - text_message = '\n\n'.join(parts) + form_data = getattr(bot_message, '_form_data', None) + resume_from = getattr(bot_message, '_resume_from_form', False) + action_title = getattr(bot_message, '_resume_action_title', '') + open_new_card = getattr(bot_message, '_open_new_card', False) + selected_notice = f'> **已选择**:{action_title}' if action_title else '' - # content = { - # 'type': 'card_json', - # 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}}, - # } + # ── decide whether this chunk needs a card update ──────────────────── + card_id = self.card_id_dict.get(message_id) + if not card_id: + return - request: ContentCardElementRequest = ( - ContentCardElementRequest.builder() - .card_id(self.card_id_dict[message_id]) - .element_id('streaming_txt') - .request_body( - ContentCardElementRequestBody.builder() - # .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204") - .content(text_message) - .sequence(msg_seq) + # ── convert message chain → text ───────────────────────────────────── + text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client) + + text_message = '' + if text_elements: + parts = [] + for paragraph in text_elements: + para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md')) + if para_text: + parts.append(para_text) + text_message = '\n\n'.join(parts) + + tenant_key = ( + message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None + ) + app_access_token = self.get_app_access_token() + tenant_access_token = self.get_tenant_access_token(tenant_key) + req_opt: RequestOption = ( + RequestOption.builder() + .app_ticket(self.app_ticket) + .tenant_key(tenant_key) + .app_access_token(app_access_token) + .tenant_access_token(tenant_access_token) + .build() + ) + + card_sequence = self._next_card_sequence(card_id, msg_seq) + + # ── RESUME: first chunk after button click ─────────────────────────── + if resume_from and card_id not in self.card_resume_transitioned: + # Transition the card from the form state into resume mode. + # Preserve the text that was shown before the pause, and seed the + # resume placeholder with the current resume content if we already + # have any on the first yielded chunk. + pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '') + initial_resume_text = text_message or '\u200b' + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=pre_pause_text, + sequence=card_sequence, + form_data=None, + notice_text=selected_notice, + resume_placeholder_text=initial_resume_text, + ) + self.card_resume_transitioned.add(card_id) + self.card_pre_pause_text[card_id] = pre_pause_text + self.card_streaming_text[card_id] = text_message + if not is_final: + return + + # ── RESUME: subsequent chunks → full card update ───────────────────── + if resume_from and card_id in self.card_resume_transitioned: + cached = self.card_streaming_text.get(card_id, '') + if text_message != cached: + self.card_streaming_text[card_id] = text_message + pre_pause_text = self.card_pre_pause_text.get(card_id, '') + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=pre_pause_text, + sequence=card_sequence, + form_data=None, + notice_text=selected_notice, + resume_placeholder_text=text_message, + ) + if not is_final: + return + + # ── NORMAL streaming (non-resume): update streaming_txt in-place ────── + if not resume_from and (msg_seq % 8 == 0 or is_final): + cached = self.card_streaming_text.get(card_id) + if text_message != cached: + self.card_streaming_text[card_id] = text_message + request: ContentCardElementRequest = ( + ContentCardElementRequest.builder() + .card_id(card_id) + .element_id('streaming_txt') + .request_body( + ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build() + ) .build() ) - .build() + response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent( + request, req_opt + ) + if not response.success(): + raise Exception( + f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, ' + f'msg: {response.msg}, log_id: {response.get_log_id()}, ' + f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' + ) + + # ── FINAL chunk: full card layout update ───────────────────────────── + if is_final: + final_seq = self._next_card_sequence(card_id, card_sequence + 1) + pre_pause = self.card_pre_pause_text.get(card_id, text_message) + resume_cached = self.card_streaming_text.get(card_id, '') + if form_data: + # Keep the current card and render the human-input action area. + # Initial pause uses the latest stream text, while re-pause after + # a button click preserves the pre-pause text and keeps the resume + # stream content in the extra placeholder area. + final_text = pre_pause if open_new_card else text_message + final_resume_placeholder = resume_cached if open_new_card else '' + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=final_text, + sequence=final_seq, + form_data=form_data, + resume_placeholder_text=final_resume_placeholder, + show_form_prompt=not open_new_card, + ) + if open_new_card: + self.card_streaming_text.pop(card_id, None) + self.card_pre_pause_text.pop(card_id, None) + else: + # The human-input prompt itself is rendered as buttons only + # on Lark, so do not keep the hidden fallback text around; + # otherwise it will resurface after the button click. + self.card_streaming_text[card_id] = '' + self.card_pre_pause_text[card_id] = '' + else: + # Normal finish: keep pre-pause + resume content visible, + # remove buttons/notice, drop the resume placeholder. + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=pre_pause, + sequence=final_seq, + form_data=None, + notice_text=selected_notice if resume_from else '', + resume_placeholder_text=resume_cached, + ) + self._drop_card_state(card_id) + self.card_id_dict.pop(message_id, None) + + # ── media (images / files) appended at the end ─────────────────────── + if is_final and media_items: + for media in media_items: + media_request: ReplyMessageRequest = ( + ReplyMessageRequest.builder() + .message_id(message_source.message_chain.message_id) + .request_body( + ReplyMessageRequestBody.builder() + .content(json.dumps(media['content'])) + .msg_type(media['msg_type']) + .reply_in_thread(False) + .uuid(str(uuid.uuid4())) + .build() + ) + .build() + ) + media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply( + media_request, req_opt + ) + if not media_response.success(): + raise Exception( + f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}' + ) + + async def _add_form_buttons_to_card( + self, + card_id: str, + message_source: platform_events.MessageEvent, + form_data: dict, + text_message: str = '', + sequence: int = 1, + ): + """Update the entire card to include form action buttons. + + Uses card.aupdate to replace the card JSON with a template that + includes the streaming text content plus interactive buttons. + """ + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=text_message, + sequence=sequence, + form_data=form_data, + ) + + async def _remove_form_buttons_from_card( + self, + card_id: str, + message_source: platform_events.MessageEvent, + text_message: str = '', + sequence: int = 1, + ): + """Replace the human-input card layout with the plain final layout.""" + await self._update_card_layout( + card_id=card_id, + message_source=message_source, + text_message=text_message, + sequence=sequence, + form_data=None, + ) + + async def _update_card_layout( + self, + card_id: str, + message_source: platform_events.MessageEvent, + text_message: str = '', + sequence: int = 1, + form_data: dict | None = None, + notice_text: str = '', + resume_placeholder_text: str = '', + show_form_prompt: bool = True, + ): + """Update the entire card layout. + + • form_data → show interactive buttons (initial Dify pause) + • notice_text → replace buttons with a grey "已选择" notice (resume transition) + • resume_placeholder_text → add a streaming_txt_resume markdown element + """ + form_data = form_data or {} + actions = form_data.get('actions', []) + form_token = form_data.get('form_token', '') + workflow_run_id = form_data.get('workflow_run_id', '') + node_title = form_data.get('node_title', '') or 'Human Input Required' + form_content = form_data.get('form_content', '') + + # When form_data is set, the visible content is rendered inside the + # interactive container, so the top streaming text should stay empty + # to avoid duplicate text above the action area. + # + # For resume notice state, keep the existing text visible in the card + # and only add the grey "selected" notice below it. + if form_data: + render_text_message = '' + else: + render_text_message = text_message + + # Determine session key from message source + if isinstance(message_source, platform_events.GroupMessage): + session_key = f'group_{message_source.group.id}' + else: + session_key = f'person_{message_source.sender.id}' + + # Build button elements matching the existing card template's thumbsup/down format + action_buttons = [] + for action in actions: + action_id = action.get('id', '') + action_title = action.get('title', action_id) + button_style = action.get('button_style', 'default') + + if button_style == 'primary': + lark_button_type = 'primary' + elif button_style == 'danger': + lark_button_type = 'danger' + else: + lark_button_type = 'default' + + action_buttons.append( + { + 'tag': 'button', + 'text': {'tag': 'plain_text', 'content': action_title}, + 'type': lark_button_type, + 'width': 'fill', + 'size': 'medium', + 'hover_tips': {'tag': 'plain_text', 'content': action_title}, + 'behaviors': [ + { + 'type': 'callback', + 'value': { + 'form_action': True, + 'form_token': form_token, + 'workflow_run_id': workflow_run_id, + 'action_id': action_id, + 'session_key': session_key, + }, + } + ], + 'margin': '0px 0px 0px 0px', + } ) - if is_final and bot_message.tool_calls is None: - # self.seq = 1 # 消息回复结束之后重置seq - self.card_id_dict.pop(message_id) # 清理已经使用过的卡片 + interactive_elements = [] + if form_data: + if show_form_prompt: + interactive_elements = [ + { + 'tag': 'markdown', + 'content': f'**[Human Input Required] {node_title}**', + 'text_align': 'left', + 'text_size': 'normal', + 'margin': '0px 0px 4px 0px', + } + ] + if form_content: + interactive_elements.append( + { + 'tag': 'markdown', + 'content': form_content, + 'text_align': 'left', + 'text_size': 'normal', + 'margin': '0px 0px 8px 0px', + } + ) + interactive_elements.append( + { + 'tag': 'column_set', + 'horizontal_spacing': '8px', + 'horizontal_align': 'left', + 'margin': '0px 0px 0px 0px', + 'columns': [ + { + 'tag': 'column', + 'width': 'weighted', + 'elements': [btn], + 'padding': '0px 0px 0px 0px', + } + for btn in action_buttons + ], + } + ) + # Build the full card JSON with buttons, same structure as create_card_id + # ── mid_section: either form buttons, resume notice, or empty ── + mid_section_elements = [] + if form_data: + mid_section_elements = [ + { + 'tag': 'interactive_container', + 'margin': '12px 0px 8px 0px', + 'padding': '12px 12px 12px 12px', + 'has_border': True, + 'elements': interactive_elements, + }, + {'tag': 'hr', 'margin': '0px 0px 0px 0px'}, + ] + elif notice_text: + mid_section_elements = [ + { + 'tag': 'markdown', + 'content': notice_text, + 'text_align': 'left', + 'text_size': 'normal', + 'margin': '8px 0px 4px 0px', + 'text_color': 'grey', + }, + {'tag': 'hr', 'margin': '0px 0px 0px 0px'}, + ] + + # ── resume placeholder element (empty, filled via acontent on each chunk) ── + resume_elements = [] + if resume_placeholder_text: + resume_elements = [ + { + 'tag': 'markdown', + 'content': resume_placeholder_text, + 'text_align': 'left', + 'text_size': 'normal', + 'margin': '0px 0px 0px 0px', + 'element_id': 'streaming_txt_resume', + }, + ] + + card_data = { + 'schema': '2.0', + 'config': { + 'update_multi': True, + 'streaming_mode': False, + }, + 'body': { + 'direction': 'vertical', + 'padding': '12px 12px 12px 12px', + 'elements': [ + { + 'tag': 'div', + 'text': { + 'tag': 'plain_text', + 'content': 'LangBot', + 'text_size': 'normal', + 'text_align': 'left', + 'text_color': 'default', + }, + 'icon': { + 'tag': 'custom_icon', + 'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg', + }, + }, + { + 'tag': 'markdown', + 'content': render_text_message, + 'text_align': 'left', + 'text_size': 'normal', + 'margin': '0px 0px 0px 0px', + 'element_id': 'streaming_txt', + }, + *mid_section_elements, + *resume_elements, + { + 'tag': 'column_set', + 'horizontal_spacing': '12px', + 'horizontal_align': 'right', + 'columns': [ + { + 'tag': 'column', + 'width': 'weighted', + 'elements': [ + { + 'tag': 'markdown', + 'content': '以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看', + 'text_align': 'left', + 'text_size': 'notation', + 'margin': '4px 0px 0px 0px', + 'icon': { + 'tag': 'standard_icon', + 'token': 'robot_outlined', + 'color': 'grey', + }, + } + ], + 'padding': '0px 0px 0px 0px', + 'direction': 'vertical', + 'horizontal_spacing': '8px', + 'vertical_spacing': '8px', + 'horizontal_align': 'left', + 'vertical_align': 'top', + 'margin': '0px 0px 0px 0px', + 'weight': 1, + } + ], + 'margin': '0px 0px 4px 0px', + }, + ], + }, + } + + try: tenant_key = ( message_source.source_platform_object.header.tenant_key if message_source.source_platform_object @@ -1558,39 +2154,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): .tenant_access_token(tenant_access_token) .build() ) - # 发起请求 - response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt) - # 处理失败返回 - if not response.success(): - raise Exception( - f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' + request: UpdateCardRequest = ( + UpdateCardRequest.builder() + .card_id(card_id) + .request_body( + UpdateCardRequestBody.builder() + .sequence(sequence) + .uuid(str(uuid.uuid4())) + .card(Card.builder().type('card_json').data(json.dumps(card_data)).build()) + .build() ) - return - - # Send media messages when streaming is done - if is_final and media_items: - for media in media_items: - media_request: ReplyMessageRequest = ( - ReplyMessageRequest.builder() - .message_id(message_source.message_chain.message_id) - .request_body( - ReplyMessageRequestBody.builder() - .content(json.dumps(media['content'])) - .msg_type(media['msg_type']) - .reply_in_thread(False) - .uuid(str(uuid.uuid4())) - .build() - ) - .build() - ) - media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply( - media_request, req_opt - ) - if not media_response.success(): - raise Exception( - f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}' - ) + .build() + ) + response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt) + if not response.success(): + await self.logger.error( + f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, ' + f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}' + ) + except Exception: + await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}') async def is_muted(self, group_id: int) -> bool: return False diff --git a/src/langbot/pkg/platform/sources/telegram.py b/src/langbot/pkg/platform/sources/telegram.py index 1833975b..c1d9ab68 100644 --- a/src/langbot/pkg/platform/sources/telegram.py +++ b/src/langbot/pkg/platform/sources/telegram.py @@ -4,11 +4,12 @@ import time import telegram import telegram.ext -from telegram import Update -from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters import telegramify_markdown import typing import traceback +import json import base64 import pydantic @@ -189,6 +190,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter): class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot: telegram.Bot = pydantic.Field(exclude=True) application: telegram.ext.Application = pydantic.Field(exclude=True) + ap: typing.Any = pydantic.Field(exclude=True, default=None) message_converter: TelegramMessageConverter = TelegramMessageConverter() event_converter: TelegramEventConverter = TelegramEventConverter() @@ -224,6 +226,95 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): telegram_callback, ) ) + + async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): + query = update.callback_query + await query.answer() + try: + data = json.loads(query.data) + if data.get('form_action') or data.get('f'): + import langbot_plugin.api.entities.builtin.provider.session as provider_session + + workflow_run_id = data.get('workflow_run_id') or data.get('w', '') + action_id = data.get('action_id') or data.get('a', '') + session_key = data.get('session_key') or data.get('s', '') + + if session_key.startswith('group_') or session_key.startswith('g:'): + launcher_type = provider_session.LauncherTypes.GROUP + launcher_id = ( + session_key.split(':', 1)[1] + if session_key.startswith('g:') + else session_key[len('group_') :] + ) + else: + launcher_type = provider_session.LauncherTypes.PERSON + launcher_id = ( + session_key.split(':', 1)[1] + if session_key.startswith('p:') + else session_key[len('person_') :] + ) + + user_id = str(query.from_user.id) + + # Find bot_uuid and pipeline_uuid + bot_uuid = '' + pipeline_uuid = None + for b in self.ap.platform_mgr.bots: + if b.adapter is self: + bot_uuid = b.bot_entity.uuid + pipeline_uuid = b.bot_entity.use_pipeline_uuid + break + + form_action_data = { + 'workflow_run_id': workflow_run_id, + 'action_id': action_id, + 'user': f'{launcher_type.value}_{launcher_id}', + 'inputs': {}, + } + + message_chain = platform_message.MessageChain( + [platform_message.Plain(text=f'[Form Action: {action_id}]')] + ) + + if launcher_type == provider_session.LauncherTypes.GROUP: + synthetic_event = platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=user_id, + member_name='', + permission=platform_entities.Permission.Member, + group=platform_entities.Group( + id=launcher_id, + name='', + permission=platform_entities.Permission.Member, + ), + ), + message_chain=message_chain, + ) + else: + synthetic_event = platform_events.FriendMessage( + sender=platform_entities.Friend( + id=user_id, + nickname='', + remark='', + ), + message_chain=message_chain, + ) + + await self.ap.query_pool.add_query( + bot_uuid=bot_uuid, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=user_id, + message_event=synthetic_event, + message_chain=message_chain, + adapter=self, + pipeline_uuid=pipeline_uuid, + variables={'_dify_form_action': form_action_data}, + ) + except Exception: + await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}') + + application.add_handler(CallbackQueryHandler(callback_query_handler)) super().__init__( config=config, logger=logger, @@ -369,6 +460,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): del args['draft_id'] await self.bot.send_message(**args) self.msg_stream_id.pop(message_id) + + # Send form action buttons if form data is present + form_data = getattr(bot_message, '_form_data', None) + if form_data: + await self._send_form_action_buttons(message_source, form_data) else: stream_id = draft_id if (msg_seq - 1) % 8 == 0 or is_final: @@ -384,6 +480,51 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): if is_final and bot_message.tool_calls is None: self.msg_stream_id.pop(message_id) + # Send form action buttons if form data is present + form_data = getattr(bot_message, '_form_data', None) + if form_data: + await self._send_form_action_buttons(message_source, form_data) + + async def _send_form_action_buttons( + self, + message_source: platform_events.MessageEvent, + form_data: dict, + ): + """Send inline keyboard buttons for Dify human_input_required form actions.""" + actions = form_data.get('actions', []) + node_title = form_data.get('node_title', '') + + if isinstance(message_source, platform_events.GroupMessage): + session_key = f'g:{message_source.group.id}' + else: + session_key = f'p:{message_source.sender.id}' + + keyboard = [] + for action in actions: + action_id = action.get('id', '') + action_title = action.get('title', action_id) + callback_data = json.dumps( + {'f': 1, 'a': action_id, 's': session_key}, + separators=(',', ':'), + ) + keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)]) + + reply_markup = InlineKeyboardMarkup(keyboard) + + update = message_source.source_platform_object + chat_id = update.effective_chat.id + message_thread_id = update.message.message_thread_id + + args = { + 'chat_id': chat_id, + 'text': f'[{node_title}] Please select an action:', + 'reply_markup': reply_markup, + } + if message_thread_id: + args['message_thread_id'] = message_thread_id + + await self.bot.send_message(**args) + def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: if not isinstance(event.source_platform_object, Update): return None diff --git a/src/langbot/pkg/provider/runners/difysvapi.py b/src/langbot/pkg/provider/runners/difysvapi.py index 039bf33a..b871a324 100644 --- a/src/langbot/pkg/provider/runners/difysvapi.py +++ b/src/langbot/pkg/provider/runners/difysvapi.py @@ -2,6 +2,7 @@ from __future__ import annotations import typing import json +import time import uuid import base64 import mimetypes @@ -16,6 +17,46 @@ from langbot.libs.dify_service_api.v1 import client, errors import httpx +# Module-level store for paused-workflow form state, keyed by session key +# (launcher_type_value + "_" + launcher_id). Keeps state out of the SDK +# Conversation type, which may not accept arbitrary attribute assignment. +_PENDING_FORMS: dict[str, dict[str, typing.Any]] = {} +_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap + + +def _session_key_from_query(query: pipeline_query.Query) -> str: + return f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + +def _prune_pending_forms(now: float | None = None) -> None: + if now is None: + now = time.time() + expired = [key for key, data in _PENDING_FORMS.items() if data.get('_expires_at', 0) <= now] + for key in expired: + _PENDING_FORMS.pop(key, None) + + +def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None: + _prune_pending_forms() + stored = dict(form_data) + expiration_time = stored.get('expiration_time') + try: + expiration_ts = float(expiration_time) if expiration_time is not None else 0.0 + except (TypeError, ValueError): + expiration_ts = 0.0 + stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL) + _PENDING_FORMS[session_key] = stored + + +def _get_pending_form(session_key: str) -> dict[str, typing.Any] | None: + _prune_pending_forms() + return _PENDING_FORMS.get(session_key) + + +def _clear_pending_form(session_key: str) -> None: + _PENDING_FORMS.pop(session_key, None) + + @runner.runner_class('dify-service-api') class DifyServiceAPIRunner(runner.RequestRunner): """Dify Service API 对话请求器""" @@ -335,11 +376,107 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.session.using_conversation.uuid = chunk['conversation_id'] + async def _submit_workflow_form_blocking( + self, form_action: dict + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """Submit human input to resume a paused Dify workflow (non-streaming).""" + + form_token = form_action['form_token'] + workflow_run_id = form_action['workflow_run_id'] + user = form_action['user'] + action_id = form_action.get('action_id', '') + inputs = form_action.get('inputs', {}) + + async for chunk in self.dify_client.workflow_submit( + form_token=form_token, + workflow_run_id=workflow_run_id, + inputs=inputs, + user=user, + action=action_id, + timeout=120, + ): + self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk)) + + if chunk['event'] == 'workflow_finished': + if chunk['data'].get('error'): + raise errors.DifyAPIError(chunk['data']['error']) + content, _ = self._process_thinking_content(chunk['data']['outputs']['summary']) + yield provider_message.Message( + role='assistant', + content=content, + ) + + def _merge_pending_form_action(self, form_action: dict | None, pending_form: dict | None) -> dict | None: + """Backfill resume fields from the pending form stored on the conversation.""" + if not form_action: + return None + + merged_action = dict(form_action) + if pending_form: + merged_action.setdefault('form_token', pending_form.get('form_token', '')) + merged_action.setdefault('workflow_run_id', pending_form.get('workflow_run_id', '')) + merged_action.setdefault('inputs', pending_form.get('inputs', {})) + merged_action.setdefault('user', pending_form.get('user', '')) + + # Resolve clicked action's display title from the stored actions list + if 'action_title' not in merged_action: + clicked_id = merged_action.get('action_id', '') + for action in pending_form.get('actions', []): + if str(action.get('id', '')) == str(clicked_id): + merged_action['action_title'] = action.get('title', clicked_id) + break + + return merged_action + + def _match_pending_form_action(self, user_text: str, pending_form: dict | None) -> dict | None: + """Match plain text replies against pending Dify form actions.""" + if not pending_form: + return None + + normalized_text = user_text.strip().lower() + if not normalized_text: + return None + + for action in pending_form.get('actions', []): + titles = { + str(action.get('title', '')).strip().lower(), + str(action.get('id', '')).strip().lower(), + } + if normalized_text in titles: + return { + 'form_token': pending_form.get('form_token', ''), + 'workflow_run_id': pending_form.get('workflow_run_id', ''), + 'action_id': action.get('id', ''), + 'action_title': action.get('title', action.get('id', '')), + 'inputs': pending_form.get('inputs', {}), + 'user': pending_form.get('user', ''), + } + + return None + async def _workflow_messages( self, query: pipeline_query.Query ) -> typing.AsyncGenerator[provider_message.Message, None]: """调用工作流""" + # Check if this is a form action resume (button click or text match) + form_action = query.variables.get('_dify_form_action') + session_key = _session_key_from_query(query) + pending_form = _get_pending_form(session_key) + + if form_action: + form_action = self._merge_pending_form_action(form_action, pending_form) + _clear_pending_form(session_key) + elif pending_form: + form_action = self._match_pending_form_action(str(query.message_chain), pending_form) + if form_action: + _clear_pending_form(session_key) + + if form_action: + async for msg in self._submit_workflow_form_blocking(form_action): + yield msg + return + if not query.session.using_conversation.uuid: query.session.using_conversation.uuid = str(uuid.uuid4()) @@ -366,6 +503,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): } inputs.update(query.variables) + human_input_yielded = False async for chunk in self.dify_client.workflow_run( inputs=inputs, @@ -377,6 +515,46 @@ class DifyServiceAPIRunner(runner.RequestRunner): if chunk['event'] in ignored_events: continue + if chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + workflow_run_id = chunk['data'].get('workflow_run_id', '') + for reason in reasons: + if reason.get('TYPE') == 'human_input_required': + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + node_title = reason.get('node_title', '') + + _set_pending_form( + _session_key_from_query(query), + { + 'workflow_run_id': workflow_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': node_title, + 'form_content': form_content, + 'inputs': reason.get('inputs', {}), + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + }, + ) + + query.variables['_dify_form_render'] = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + } + + action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions) + display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}' + + human_input_yielded = True + yield provider_message.Message( + role='assistant', + content=display_text, + ) + if chunk['event'] == 'node_started': if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': continue @@ -399,6 +577,8 @@ class DifyServiceAPIRunner(runner.RequestRunner): yield msg elif chunk['event'] == 'workflow_finished': + if human_input_yielded: + break if chunk['data']['error']: raise errors.DifyAPIError(chunk['data']['error']) content, _ = self._process_thinking_content(chunk['data']['outputs']['summary']) @@ -636,11 +816,143 @@ class DifyServiceAPIRunner(runner.RequestRunner): query.session.using_conversation.uuid = chunk['conversation_id'] + async def _submit_workflow_form( + self, form_action: dict + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """Submit human input to resume a paused Dify workflow.""" + + form_token = form_action['form_token'] + workflow_run_id = form_action['workflow_run_id'] + user = form_action['user'] + action_id = form_action.get('action_id', '') + action_title = form_action.get('action_title', '') or action_id + inputs = form_action.get('inputs', {}) + + messsage_idx = 0 + is_final = False + think_start = False + think_end = False + workflow_contents = '' + repause_form_data: dict | None = None + + remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think') + async for chunk in self.dify_client.workflow_submit( + form_token=form_token, + workflow_run_id=workflow_run_id, + inputs=inputs, + user=user, + action=action_id, + timeout=120, + ): + self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk)) + + yield_this_iteration = False + + if chunk['event'] == 'workflow_finished': + is_final = True + yield_this_iteration = True + if chunk['data'].get('error'): + raise errors.DifyAPIError(chunk['data']['error']) + + if chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id) + for reason in reasons: + if reason.get('TYPE') != 'human_input_required': + continue + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + node_title = reason.get('node_title', '') + raw_inputs = reason.get('inputs', {}) + + _set_pending_form( + user, + { + 'workflow_run_id': new_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': node_title, + 'form_content': form_content, + 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {}, + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': user, + }, + ) + + repause_form_data = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + 'workflow_run_id': new_run_id, + 'form_token': reason.get('form_token', ''), + } + is_final = True + yield_this_iteration = True + break + + if chunk['event'] == 'text_chunk': + messsage_idx += 1 + if remove_think: + if '' in chunk['data']['text'] and not think_start: + think_start = True + continue + if '' in chunk['data']['text'] and not think_end: + import re + + content = re.sub(r'^\n', '', chunk['data']['text']) + workflow_contents += content + think_end = True + elif think_end: + workflow_contents += chunk['data']['text'] + if think_start: + continue + else: + workflow_contents += chunk['data']['text'] + if messsage_idx % 8 == 0: + yield_this_iteration = True + + if yield_this_iteration: + msg = provider_message.MessageChunk( + role='assistant', + content=workflow_contents, + is_final=is_final, + ) + msg._resume_from_form = True + if action_title: + msg._resume_action_title = action_title + if is_final and repause_form_data: + msg._form_data = repause_form_data + msg._open_new_card = True + yield msg + if is_final: + return + async def _workflow_messages_chunk( self, query: pipeline_query.Query ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: """调用工作流""" + # Check if this is a form action resume (button click or text match) + form_action = query.variables.get('_dify_form_action') + session_key = _session_key_from_query(query) + pending_form = _get_pending_form(session_key) + + if form_action: + form_action = self._merge_pending_form_action(form_action, pending_form) + _clear_pending_form(session_key) + elif pending_form: + form_action = self._match_pending_form_action(str(query.message_chain), pending_form) + if form_action: + _clear_pending_form(session_key) + + if form_action: + # Resume paused workflow via submit endpoint + async for msg in self._submit_workflow_form(form_action): + yield msg + return + if not query.session.using_conversation.uuid: query.session.using_conversation.uuid = str(uuid.uuid4()) @@ -672,6 +984,13 @@ class DifyServiceAPIRunner(runner.RequestRunner): think_start = False think_end = False workflow_contents = '' + workflow_run_id = '' + human_input_yielded = False + + # Saved form data to attach to the final MessageChunk so the adapter + # can detect it when is_final=True and render buttons. + pending_form_data = None + display_text = '' remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') async for chunk in self.dify_client.workflow_run( @@ -682,7 +1001,62 @@ class DifyServiceAPIRunner(runner.RequestRunner): ): self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk)) if chunk['event'] in ignored_events: + if chunk['event'] == 'workflow_started': + workflow_run_id = chunk['data'].get('workflow_run_id', '') continue + + if chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id) + for reason in reasons: + if reason.get('TYPE') == 'human_input_required': + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + node_title = reason.get('node_title', '') + + # Persist form state in module-level store keyed by session + raw_inputs = reason.get('inputs', {}) + _set_pending_form( + _session_key_from_query(query), + { + 'workflow_run_id': workflow_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': node_title, + 'form_content': form_content, + 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {}, + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + }, + ) + + # Pass form render metadata to downstream stages + query.variables['_dify_form_render'] = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + } + + action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions) + display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}' + workflow_contents += display_text + '\n' + + # Save form data to attach to the final chunk later. + # We do NOT yield here — the form content will be sent + # as the final MessageChunk (with is_final=True and + # _form_data) so the adapter can update the card and + # add buttons in one pass. + pending_form_data = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + 'workflow_run_id': workflow_run_id, + 'form_token': reason.get('form_token', ''), + } + human_input_yielded = True + if chunk['event'] == 'workflow_finished': is_final = True if chunk['data']['error']: @@ -730,11 +1104,29 @@ class DifyServiceAPIRunner(runner.RequestRunner): yield msg if messsage_idx % 8 == 0 or is_final: - yield provider_message.MessageChunk( + final_content = workflow_contents if workflow_contents.strip() else '' + msg = provider_message.MessageChunk( role='assistant', - content=workflow_contents, + content=final_content, is_final=is_final, ) + # Attach form data to the final chunk for the adapter + if is_final and pending_form_data: + msg._form_data = pending_form_data + pending_form_data = None + yield msg + + # If the stream ended after workflow_paused without a + # workflow_finished event, yield a final chunk so the adapter + # can update the card and add buttons. + if human_input_yielded and not is_final: + msg = provider_message.MessageChunk( + role='assistant', + content=workflow_contents or display_text, + is_final=True, + ) + msg._form_data = pending_form_data + yield msg async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: """运行请求"""