feat(dify): add support for chatflow app type and enhance human input handling

This commit is contained in:
fdc310
2026-06-21 13:50:50 +08:00
parent 7f36103aa4
commit 321891441c
+207 -7
View File
@@ -147,7 +147,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
self.ap = ap
self.pipeline_config = pipeline_config
valid_app_types = ['chat', 'agent', 'workflow']
valid_app_types = ['chat', 'agent', 'workflow', 'chatflow']
if self.pipeline_config['ai']['dify-service-api']['app-type'] not in valid_app_types:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
@@ -295,6 +295,21 @@ class DifyServiceAPIRunner(runner.RequestRunner):
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
@@ -333,6 +348,45 @@ class DifyServiceAPIRunner(runner.RequestRunner):
mode = 'workflow'
if mode == 'workflow':
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': reason.get('inputs', {}),
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
yield provider_message.Message(
role='assistant',
content=display_text,
)
return
if chunk['event'] == 'node_finished':
if chunk['data']['node_type'] == 'answer':
answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer'))
@@ -485,6 +539,41 @@ class DifyServiceAPIRunner(runner.RequestRunner):
role='assistant',
content=content,
)
return
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
paused_node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
user,
{
'workflow_run_id': new_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': paused_node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': user,
},
)
display_text = _format_human_input_text(paused_node_title, form_content, actions)
yield provider_message.Message(
role='assistant',
content=display_text,
)
return
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
"""Locate the pending form this action targets.
@@ -721,6 +810,21 @@ class DifyServiceAPIRunner(runner.RequestRunner):
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form(form_action):
yield msg
return
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
@@ -749,6 +853,9 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False
think_end = False
yielded_final = False
human_input_yielded = False
pending_form_data = None
display_text = ''
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
@@ -764,7 +871,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] == 'workflow_started':
mode = 'workflow'
elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'):
elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished', 'workflow_paused'):
# Some Dify deployments may omit workflow_started in streamed chunks.
mode = 'workflow'
@@ -792,9 +899,61 @@ class DifyServiceAPIRunner(runner.RequestRunner):
is_final = True
elif chunk['event'] == 'workflow_finished':
is_final = True
if human_input_yielded:
break
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if mode == 'workflow' and chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
# Use a zero-width space so ResponseWrapper lets the chunk
# propagate to SendResponseBackStage, but the adapter
# detects _form_data and renders buttons instead of the
# plain-text prompt (mirrors _workflow_messages_chunk).
if not basic_mode_pending_chunk:
basic_mode_pending_chunk = ''
pending_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': workflow_run_id,
'form_token': reason.get('form_token', ''),
}
human_input_yielded = True
if mode == 'workflow' and chunk['event'] == 'node_finished':
if chunk['data'].get('node_type') == 'answer':
answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer'))
@@ -806,15 +965,31 @@ class DifyServiceAPIRunner(runner.RequestRunner):
and (is_final or message_idx % 8 == 0)
and (basic_mode_pending_chunk != '' or is_final)
):
# content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.MessageChunk(
final_content = basic_mode_pending_chunk if basic_mode_pending_chunk.strip() else ''
msg = provider_message.MessageChunk(
role='assistant',
content=basic_mode_pending_chunk,
content=final_content,
is_final=is_final,
)
if is_final and pending_form_data:
msg._form_data = pending_form_data
pending_form_data = None
yield msg
if is_final:
yielded_final = True
# If the stream ended after workflow_paused without a
# workflow_finished event, yield a final chunk so the adapter
# can update the card and add buttons.
if human_input_yielded and not yielded_final:
msg = provider_message.MessageChunk(
role='assistant',
content=basic_mode_pending_chunk or display_text,
is_final=True,
)
msg._form_data = pending_form_data
yield msg
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
@@ -1054,6 +1229,31 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if messsage_idx % 8 == 0:
yield_this_iteration = True
# Chatflow apps return answers via 'message' events (answer field),
# not 'text_chunk' events (data.text field).
if chunk['event'] == 'message':
answer = chunk.get('answer', '')
if answer:
messsage_idx += 1
if remove_think:
if '<think>' in answer and not think_start:
think_start = True
continue
if '</think>' in answer and not think_end:
import re
content = re.sub(r'^\n</think>', '', answer)
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += answer
if think_start:
continue
else:
workflow_contents += answer
if messsage_idx % 8 == 0:
yield_this_iteration = True
if yield_this_iteration:
msg = provider_message.MessageChunk(
role='assistant',
@@ -1271,7 +1471,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
"""运行请求"""
if await query.adapter.is_stream_output_supported():
msg_idx = 0
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
if self.pipeline_config['ai']['dify-service-api']['app-type'] in ('chat', 'chatflow'):
async for msg in self._chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
@@ -1291,7 +1491,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
else:
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
if self.pipeline_config['ai']['dify-service-api']['app-type'] in ('chat', 'chatflow'):
async for msg in self._chat_messages(query):
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':