diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py
index 51fddf7b..3b072bc2 100644
--- a/pkg/provider/runners/difysvapi.py
+++ b/pkg/provider/runners/difysvapi.py
@@ -354,18 +354,344 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
+
+ async def _chat_messages_chunk(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.MessageChunk, None]:
+ """调用聊天助手"""
+ cov_id = query.session.using_conversation.uuid or ''
+ query.variables['conversation_id'] = cov_id
+
+ plain_text, image_ids = await self._preprocess_user_message(query)
+
+ files = [
+ {
+ 'type': 'image',
+ 'transfer_method': 'local_file',
+ 'upload_file_id': image_id,
+ }
+ for image_id in image_ids
+ ]
+
+ mode = 'basic' # 标记是基础编排还是工作流编排
+
+ basic_mode_pending_chunk = ''
+
+ inputs = {}
+
+ inputs.update(query.variables)
+ message_idx = 0
+
+ chunk = None # 初始化chunk变量,防止在没有响应时引用错误
+
+ is_final = False
+ think_start = False
+ think_end = False
+
+ remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
+
+ async for chunk in self.dify_client.chat_messages(
+ inputs=inputs,
+ query=plain_text,
+ user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
+ conversation_id=cov_id,
+ files=files,
+ timeout=120,
+ ):
+ self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
+
+ # if chunk['event'] == 'workflow_started':
+ # mode = 'workflow'
+ # if mode == 'workflow':
+ # elif mode == 'basic':
+ # 因为都只是返回的 message也没有工具调用什么的,暂时不分类
+ if chunk['event'] == 'message':
+ message_idx += 1
+ if remove_think:
+ if message_idx == 1:
+ think_start = True
+ continue
+ if '' in chunk['answer'] and not think_end:
+ import re
+ content = re.sub(r'^\n', '', chunk['answer'])
+ basic_mode_pending_chunk += content
+ think_end = True
+ elif think_end:
+ basic_mode_pending_chunk += chunk['answer']
+ if think_start:
+ continue
+
+ else:
+ basic_mode_pending_chunk += chunk['answer']
+
+ if chunk['event'] == 'message_end':
+ is_final = True
+
+ if is_final or message_idx % 8 == 0:
+ # content, _ = self._process_thinking_content(basic_mode_pending_chunk)
+ yield llm_entities.MessageChunk(
+ role='assistant',
+ content=basic_mode_pending_chunk,
+ is_final=is_final,
+ )
+
+
+ if chunk is None:
+ raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
+
+ query.session.using_conversation.uuid = chunk['conversation_id']
+
+
+ async def _agent_chat_messages_chunk(
+ self, query: core_entities.Query
+ ) -> typing.AsyncGenerator[llm_entities.Message, None]:
+ """调用聊天助手"""
+ cov_id = query.session.using_conversation.uuid or ''
+ query.variables['conversation_id'] = cov_id
+
+ plain_text, image_ids = await self._preprocess_user_message(query)
+
+ files = [
+ {
+ 'type': 'image',
+ 'transfer_method': 'local_file',
+ 'upload_file_id': image_id,
+ }
+ for image_id in image_ids
+ ]
+
+ ignored_events = []
+
+ inputs = {}
+
+ inputs.update(query.variables)
+
+ pending_agent_message = ''
+
+ chunk = None # 初始化chunk变量,防止在没有响应时引用错误
+ message_idx = 0
+ is_final = False
+ think_start = False
+ think_end = False
+
+ remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
+
+ async for chunk in self.dify_client.chat_messages(
+ inputs=inputs,
+ query=plain_text,
+ user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
+ response_mode='streaming',
+ conversation_id=cov_id,
+ files=files,
+ timeout=120,
+ ):
+ self.ap.logger.debug('dify-agent-chunk: ' + str(chunk))
+
+ if chunk['event'] in ignored_events:
+ continue
+
+ if chunk['event'] == 'agent_message':
+ message_idx += 1
+ if remove_think:
+ if '' in chunk['answer'] and not think_start:
+ think_start = True
+ continue
+ if '' in chunk['answer'] and not think_end:
+ import re
+ content = re.sub(r'^\n', '', chunk['answer'])
+ pending_agent_message += content
+ think_end = True
+ elif think_end:
+ pending_agent_message += chunk['answer']
+ if think_start:
+ continue
+
+ else:
+ pending_agent_message += chunk['answer']
+ elif chunk['event'] == 'message_end':
+ is_final = True
+ else:
+
+ if chunk['event'] == 'agent_thought':
+ if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
+ continue
+ message_idx += 1
+ if chunk['tool']:
+ msg = llm_entities.MessageChunk(
+ role='assistant',
+ tool_calls=[
+ llm_entities.ToolCall(
+ id=chunk['id'],
+ type='function',
+ function=llm_entities.FunctionCall(
+ name=chunk['tool'],
+ arguments=json.dumps({}),
+ ),
+ )
+ ],
+ )
+ yield msg
+ if chunk['event'] == 'message_file':
+ message_idx += 1
+ if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant':
+ base_url = self.dify_client.base_url
+
+ if base_url.endswith('/v1'):
+ base_url = base_url[:-3]
+
+ image_url = base_url + chunk['url']
+
+ yield llm_entities.MessageChunk(
+ role='assistant',
+ content=[llm_entities.ContentElement.from_image_url(image_url)],
+ is_final=is_final,
+
+ )
+
+ if chunk['event'] == 'error':
+ raise errors.DifyAPIError('dify 服务错误: ' + chunk['message'])
+ if message_idx % 8 == 0 or is_final:
+ yield llm_entities.MessageChunk(
+ role='assistant',
+ content=pending_agent_message,
+ is_final=is_final,
+ )
+
+ if chunk is None:
+ raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
+
+ query.session.using_conversation.uuid = chunk['conversation_id']
+
+ async def _workflow_messages_chunk(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
+ """调用工作流"""
+
+ if not query.session.using_conversation.uuid:
+ query.session.using_conversation.uuid = str(uuid.uuid4())
+
+ query.variables['conversation_id'] = query.session.using_conversation.uuid
+
+ plain_text, image_ids = await self._preprocess_user_message(query)
+
+ files = [
+ {
+ 'type': 'image',
+ 'transfer_method': 'local_file',
+ 'upload_file_id': image_id,
+ }
+ for image_id in image_ids
+ ]
+
+ ignored_events = ['workflow_started']
+
+ inputs = { # these variables are legacy variables, we need to keep them for compatibility
+ 'langbot_user_message_text': plain_text,
+ 'langbot_session_id': query.variables['session_id'],
+ 'langbot_conversation_id': query.variables['conversation_id'],
+ 'langbot_msg_create_time': query.variables['msg_create_time'],
+ }
+
+ inputs.update(query.variables)
+ messsage_idx = 0
+ is_final = False
+ think_start = False
+ think_end = False
+ workflow_contents = ''
+
+ remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
+ async for chunk in self.dify_client.workflow_run(
+ inputs=inputs,
+ user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
+ files=files,
+ timeout=120,
+ ):
+ self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
+ if chunk['event'] in ignored_events:
+ continue
+ if chunk['event'] == 'workflow_finished':
+ is_final = True
+ if chunk['data']['error']:
+ raise errors.DifyAPIError(chunk['data']['error'])
+
+ if chunk['event'] == 'text_chunk':
+ messsage_idx += 1
+ if remove_think:
+ if '' in chunk['data']['text'] and not think_start:
+ think_start = True
+ continue
+ if '' in chunk['data']['text'] and not think_end:
+ import re
+ content = re.sub(r'^\n', '', chunk['data']['text'])
+ workflow_contents += content
+ think_end = True
+ elif think_end:
+ workflow_contents += chunk['data']['text']
+ if think_start:
+ continue
+
+ else:
+ workflow_contents += chunk['data']['text']
+
+ if chunk['event'] == 'node_started':
+ if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
+ continue
+ messsage_idx += 1
+ msg = llm_entities.MessageChunk(
+ role='assistant',
+ content=None,
+ tool_calls=[
+ llm_entities.ToolCall(
+ id=chunk['data']['node_id'],
+ type='function',
+ function=llm_entities.FunctionCall(
+ name=chunk['data']['title'],
+ arguments=json.dumps({}),
+ ),
+ )
+ ],
+ )
+
+ yield msg
+
+
+ if messsage_idx % 8 == 0 or is_final:
+ yield llm_entities.MessageChunk(
+ role='assistant',
+ content=workflow_contents,
+ is_final=is_final,
+ )
+
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""运行请求"""
- if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
- async for msg in self._chat_messages(query):
- yield msg
- elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
- async for msg in self._agent_chat_messages(query):
- yield msg
- elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
- async for msg in self._workflow_messages(query):
- yield msg
+ if await query.adapter.is_stream_output_supported():
+ msg_idx = 0
+ if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
+ async for msg in self._chat_messages_chunk(query):
+ msg_idx += 1
+ msg.msg_sequence = msg_idx
+ yield msg
+ elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
+ async for msg in self._agent_chat_messages_chunk(query):
+ msg_idx += 1
+ msg.msg_sequence = msg_idx
+ yield msg
+ elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
+ async for msg in self._workflow_messages_chunk(query):
+ msg_idx += 1
+ msg.msg_sequence = msg_idx
+ yield msg
+ else:
+ raise errors.DifyAPIError(
+ f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
+ )
else:
- raise errors.DifyAPIError(
- f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
- )
\ No newline at end of file
+ if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
+ async for msg in self._chat_messages(query):
+ yield msg
+ elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
+ async for msg in self._agent_chat_messages(query):
+ yield msg
+ elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
+ async for msg in self._workflow_messages(query):
+ yield msg
+ else:
+ raise errors.DifyAPIError(
+ f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
+ )
\ No newline at end of file