refactor(dify): Optimize the Dify API output parsing and workflow processing logic (#2027)

- Add the _extract_dify_text_output method to uniformly handle the parsing of Dify output content

- Modify the content extraction method for the answer node in workflow mode

- Add workflow mode detection logic to support the workflow_started event

- Handle error state checks upon completion of the workflow

- Improve the message chunking logic for both basic and workflow modes

- Add a mechanism to capture answer content upon completion of a workflow node
This commit is contained in:
marun
2026-03-05 15:15:40 +08:00
committed by GitHub
parent 10f253015d
commit 2d63d528c6

View File

@@ -72,6 +72,28 @@ class DifyServiceAPIRunner(runner.RequestRunner):
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip() content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content return content, thinking_content
def _extract_dify_text_output(self, value: typing.Any) -> str:
"""Extract text content from Dify output payload."""
if value is None:
return ''
if isinstance(value, dict):
content = value.get('content')
if isinstance(content, str):
return content
return json.dumps(value, ensure_ascii=False)
if isinstance(value, str):
text = value.strip()
if not text:
return ''
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return value
if isinstance(parsed, dict) and isinstance(parsed.get('content'), str):
return parsed['content']
return value
return str(value)
async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[dict]]: async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[dict]]:
"""预处理用户消息,提取纯文本,并将图片/文件上传到 Dify 服务 """预处理用户消息,提取纯文本,并将图片/文件上传到 Dify 服务
@@ -192,7 +214,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if mode == 'workflow': if mode == 'workflow':
if chunk['event'] == 'node_finished': if chunk['event'] == 'node_finished':
if chunk['data']['node_type'] == 'answer': if chunk['data']['node_type'] == 'answer':
content, _ = self._process_thinking_content(chunk['data']['outputs']['answer']) answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer'))
content, _ = self._process_thinking_content(answer)
yield provider_message.Message( yield provider_message.Message(
role='assistant', role='assistant',
@@ -405,6 +428,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
for f in upload_files for f in upload_files
] ]
mode = 'basic'
basic_mode_pending_chunk = '' basic_mode_pending_chunk = ''
inputs = {} inputs = {}
@@ -430,11 +454,12 @@ class DifyServiceAPIRunner(runner.RequestRunner):
): ):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk)) self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
# if chunk['event'] == 'workflow_started': if chunk['event'] == 'workflow_started':
# mode = 'workflow' mode = 'workflow'
# if mode == 'workflow': elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'):
# elif mode == 'basic': # Some Dify deployments may omit workflow_started in streamed chunks.
# 因为都只是返回的 message也没有工具调用什么的暂时不分类 mode = 'workflow'
if chunk['event'] == 'message': if chunk['event'] == 'message':
message_idx += 1 message_idx += 1
if remove_think: if remove_think:
@@ -457,8 +482,18 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] == 'message_end': if chunk['event'] == 'message_end':
is_final = True is_final = True
elif chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if is_final or message_idx % 8 == 0: if mode == 'workflow' and chunk['event'] == 'node_finished':
if chunk['data'].get('node_type') == 'answer':
answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer'))
if answer:
basic_mode_pending_chunk = answer
if (is_final or message_idx % 8 == 0) and (basic_mode_pending_chunk != '' or is_final):
# content, _ = self._process_thinking_content(basic_mode_pending_chunk) # content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.MessageChunk( yield provider_message.MessageChunk(
role='assistant', role='assistant',