From 2d63d528c6c88a00207fb9e5a4bb47aab72caccb Mon Sep 17 00:00:00 2001 From: marun Date: Thu, 5 Mar 2026 15:15:40 +0800 Subject: [PATCH] refactor(dify): Optimize the Dify API output parsing and workflow processing logic (#2027) - Add the _extract_dify_text_output method to uniformly handle the parsing of Dify output content - Modify the content extraction method for the answer node in workflow mode - Add workflow mode detection logic to support the workflow_started event - Handle error state checks upon completion of the workflow - Improve the message chunking logic for both basic and workflow modes - Add a mechanism to capture answer content upon completion of a workflow node --- src/langbot/pkg/provider/runners/difysvapi.py | 49 ++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/src/langbot/pkg/provider/runners/difysvapi.py b/src/langbot/pkg/provider/runners/difysvapi.py index 24d902be..98da78c1 100644 --- a/src/langbot/pkg/provider/runners/difysvapi.py +++ b/src/langbot/pkg/provider/runners/difysvapi.py @@ -72,6 +72,28 @@ class DifyServiceAPIRunner(runner.RequestRunner): content = f'\n{thinking_content}\n\n{content}'.strip() return content, thinking_content + def _extract_dify_text_output(self, value: typing.Any) -> str: + """Extract text content from Dify output payload.""" + if value is None: + return '' + if isinstance(value, dict): + content = value.get('content') + if isinstance(content, str): + return content + return json.dumps(value, ensure_ascii=False) + if isinstance(value, str): + text = value.strip() + if not text: + return '' + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return value + if isinstance(parsed, dict) and isinstance(parsed.get('content'), str): + return parsed['content'] + return value + return str(value) + async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[dict]]: """预处理用户消息,提取纯文本,并将图片/文件上传到 Dify 服务 @@ -192,7 +214,8 @@ class DifyServiceAPIRunner(runner.RequestRunner): if mode == 'workflow': if chunk['event'] == 'node_finished': if chunk['data']['node_type'] == 'answer': - content, _ = self._process_thinking_content(chunk['data']['outputs']['answer']) + answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer')) + content, _ = self._process_thinking_content(answer) yield provider_message.Message( role='assistant', @@ -405,6 +428,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): for f in upload_files ] + mode = 'basic' basic_mode_pending_chunk = '' inputs = {} @@ -430,11 +454,12 @@ class DifyServiceAPIRunner(runner.RequestRunner): ): self.ap.logger.debug('dify-chat-chunk: ' + str(chunk)) - # if chunk['event'] == 'workflow_started': - # mode = 'workflow' - # if mode == 'workflow': - # elif mode == 'basic': - # 因为都只是返回的 message也没有工具调用什么的,暂时不分类 + if chunk['event'] == 'workflow_started': + mode = 'workflow' + elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'): + # Some Dify deployments may omit workflow_started in streamed chunks. + mode = 'workflow' + if chunk['event'] == 'message': message_idx += 1 if remove_think: @@ -457,8 +482,18 @@ class DifyServiceAPIRunner(runner.RequestRunner): if chunk['event'] == 'message_end': is_final = True + elif chunk['event'] == 'workflow_finished': + is_final = True + if chunk['data'].get('error'): + raise errors.DifyAPIError(chunk['data']['error']) - if is_final or message_idx % 8 == 0: + if mode == 'workflow' and chunk['event'] == 'node_finished': + if chunk['data'].get('node_type') == 'answer': + answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer')) + if answer: + basic_mode_pending_chunk = answer + + if (is_final or message_idx % 8 == 0) and (basic_mode_pending_chunk != '' or is_final): # content, _ = self._process_thinking_content(basic_mode_pending_chunk) yield provider_message.MessageChunk( role='assistant',