From 321891441cbd3aac68d638800951f5c467915e83 Mon Sep 17 00:00:00 2001 From: fdc310 <2213070223@qq.com> Date: Sun, 21 Jun 2026 13:50:50 +0800 Subject: [PATCH] feat(dify): add support for chatflow app type and enhance human input handling --- src/langbot/pkg/provider/runners/difysvapi.py | 214 +++++++++++++++++- 1 file changed, 207 insertions(+), 7 deletions(-) diff --git a/src/langbot/pkg/provider/runners/difysvapi.py b/src/langbot/pkg/provider/runners/difysvapi.py index 93f930d9f..52bbbe103 100644 --- a/src/langbot/pkg/provider/runners/difysvapi.py +++ b/src/langbot/pkg/provider/runners/difysvapi.py @@ -147,7 +147,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): self.ap = ap self.pipeline_config = pipeline_config - valid_app_types = ['chat', 'agent', 'workflow'] + valid_app_types = ['chat', 'agent', 'workflow', 'chatflow'] if self.pipeline_config['ai']['dify-service-api']['app-type'] not in valid_app_types: raise errors.DifyAPIError( f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}' @@ -295,6 +295,21 @@ class DifyServiceAPIRunner(runner.RequestRunner): self, query: pipeline_query.Query ) -> typing.AsyncGenerator[provider_message.Message, None]: """调用聊天助手""" + # Check if this is a form action resume (button click or text match) + form_action_raw = query.variables.get('_dify_form_action') + session_key = _session_key_from_query(query) + + if form_action_raw: + form_action = self._merge_pending_form_action(session_key, form_action_raw) + else: + form_action = self._match_pending_form_action(session_key, str(query.message_chain)) + + if form_action: + _clear_pending_form(session_key, form_action.get('form_token') or None) + async for msg in self._submit_workflow_form_blocking(form_action): + yield msg + return + cov_id = query.session.using_conversation.uuid or None query.variables['conversation_id'] = cov_id @@ -333,6 +348,45 @@ class DifyServiceAPIRunner(runner.RequestRunner): mode = 'workflow' if mode == 'workflow': + if chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + workflow_run_id = chunk['data'].get('workflow_run_id', '') + for reason in reasons: + if reason.get('TYPE') != 'human_input_required': + continue + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + node_title = reason.get('node_title', '') + + _set_pending_form( + _session_key_from_query(query), + { + 'workflow_run_id': workflow_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': node_title, + 'form_content': form_content, + 'inputs': reason.get('inputs', {}), + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + }, + ) + + query.variables['_dify_form_render'] = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + } + + display_text = _format_human_input_text(node_title, form_content, actions) + yield provider_message.Message( + role='assistant', + content=display_text, + ) + return + if chunk['event'] == 'node_finished': if chunk['data']['node_type'] == 'answer': answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer')) @@ -485,6 +539,41 @@ class DifyServiceAPIRunner(runner.RequestRunner): role='assistant', content=content, ) + return + + if chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id) + for reason in reasons: + if reason.get('TYPE') != 'human_input_required': + continue + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + paused_node_title = reason.get('node_title', '') + raw_inputs = reason.get('inputs', {}) + + _set_pending_form( + user, + { + 'workflow_run_id': new_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': paused_node_title, + 'form_content': form_content, + 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {}, + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': user, + }, + ) + + display_text = _format_human_input_text(paused_node_title, form_content, actions) + yield provider_message.Message( + role='assistant', + content=display_text, + ) + return def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None: """Locate the pending form this action targets. @@ -721,6 +810,21 @@ class DifyServiceAPIRunner(runner.RequestRunner): self, query: pipeline_query.Query ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: """调用聊天助手""" + # Check if this is a form action resume (button click or text match) + form_action_raw = query.variables.get('_dify_form_action') + session_key = _session_key_from_query(query) + + if form_action_raw: + form_action = self._merge_pending_form_action(session_key, form_action_raw) + else: + form_action = self._match_pending_form_action(session_key, str(query.message_chain)) + + if form_action: + _clear_pending_form(session_key, form_action.get('form_token') or None) + async for msg in self._submit_workflow_form(form_action): + yield msg + return + cov_id = query.session.using_conversation.uuid or None query.variables['conversation_id'] = cov_id @@ -749,6 +853,9 @@ class DifyServiceAPIRunner(runner.RequestRunner): think_start = False think_end = False yielded_final = False + human_input_yielded = False + pending_form_data = None + display_text = '' remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think') @@ -764,7 +871,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): if chunk['event'] == 'workflow_started': mode = 'workflow' - elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'): + elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished', 'workflow_paused'): # Some Dify deployments may omit workflow_started in streamed chunks. mode = 'workflow' @@ -792,9 +899,61 @@ class DifyServiceAPIRunner(runner.RequestRunner): is_final = True elif chunk['event'] == 'workflow_finished': is_final = True + if human_input_yielded: + break if chunk['data'].get('error'): raise errors.DifyAPIError(chunk['data']['error']) + if mode == 'workflow' and chunk['event'] == 'workflow_paused': + reasons = chunk['data'].get('reasons', []) + workflow_run_id = chunk['data'].get('workflow_run_id', '') + for reason in reasons: + if reason.get('TYPE') != 'human_input_required': + continue + form_content = reason.get('form_content', '') + actions = reason.get('actions', []) + node_title = reason.get('node_title', '') + + raw_inputs = reason.get('inputs', {}) + _set_pending_form( + _session_key_from_query(query), + { + 'workflow_run_id': workflow_run_id, + 'form_id': reason.get('form_id'), + 'form_token': reason.get('form_token'), + 'node_id': reason.get('node_id'), + 'node_title': node_title, + 'form_content': form_content, + 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {}, + 'actions': actions, + 'expiration_time': reason.get('expiration_time'), + 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + }, + ) + + query.variables['_dify_form_render'] = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + } + + display_text = _format_human_input_text(node_title, form_content, actions) + # Use a zero-width space so ResponseWrapper lets the chunk + # propagate to SendResponseBackStage, but the adapter + # detects _form_data and renders buttons instead of the + # plain-text prompt (mirrors _workflow_messages_chunk). + if not basic_mode_pending_chunk: + basic_mode_pending_chunk = '​' + + pending_form_data = { + 'form_content': form_content, + 'actions': actions, + 'node_title': node_title, + 'workflow_run_id': workflow_run_id, + 'form_token': reason.get('form_token', ''), + } + human_input_yielded = True + if mode == 'workflow' and chunk['event'] == 'node_finished': if chunk['data'].get('node_type') == 'answer': answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer')) @@ -806,15 +965,31 @@ class DifyServiceAPIRunner(runner.RequestRunner): and (is_final or message_idx % 8 == 0) and (basic_mode_pending_chunk != '' or is_final) ): - # content, _ = self._process_thinking_content(basic_mode_pending_chunk) - yield provider_message.MessageChunk( + final_content = basic_mode_pending_chunk if basic_mode_pending_chunk.strip() else '' + msg = provider_message.MessageChunk( role='assistant', - content=basic_mode_pending_chunk, + content=final_content, is_final=is_final, ) + if is_final and pending_form_data: + msg._form_data = pending_form_data + pending_form_data = None + yield msg if is_final: yielded_final = True + # If the stream ended after workflow_paused without a + # workflow_finished event, yield a final chunk so the adapter + # can update the card and add buttons. + if human_input_yielded and not yielded_final: + msg = provider_message.MessageChunk( + role='assistant', + content=basic_mode_pending_chunk or display_text, + is_final=True, + ) + msg._form_data = pending_form_data + yield msg + if chunk is None: raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置') @@ -1054,6 +1229,31 @@ class DifyServiceAPIRunner(runner.RequestRunner): if messsage_idx % 8 == 0: yield_this_iteration = True + # Chatflow apps return answers via 'message' events (answer field), + # not 'text_chunk' events (data.text field). + if chunk['event'] == 'message': + answer = chunk.get('answer', '') + if answer: + messsage_idx += 1 + if remove_think: + if '' in answer and not think_start: + think_start = True + continue + if '' in answer and not think_end: + import re + + content = re.sub(r'^\n', '', answer) + workflow_contents += content + think_end = True + elif think_end: + workflow_contents += answer + if think_start: + continue + else: + workflow_contents += answer + if messsage_idx % 8 == 0: + yield_this_iteration = True + if yield_this_iteration: msg = provider_message.MessageChunk( role='assistant', @@ -1271,7 +1471,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): """运行请求""" if await query.adapter.is_stream_output_supported(): msg_idx = 0 - if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat': + if self.pipeline_config['ai']['dify-service-api']['app-type'] in ('chat', 'chatflow'): async for msg in self._chat_messages_chunk(query): msg_idx += 1 msg.msg_sequence = msg_idx @@ -1291,7 +1491,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}' ) else: - if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat': + if self.pipeline_config['ai']['dify-service-api']['app-type'] in ('chat', 'chatflow'): async for msg in self._chat_messages(query): yield msg elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':