From 2351193c5109b186663da741976d683a5f71c8ac Mon Sep 17 00:00:00 2001 From: Dong_master <2213070223@qq.com> Date: Fri, 15 Aug 2025 00:50:32 +0800 Subject: [PATCH] fix: in the difysvapi.py add stream , and remove_think on chunk --- pkg/provider/runners/difysvapi.py | 350 +++++++++++++++++++++++++++++- 1 file changed, 338 insertions(+), 12 deletions(-) diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index 51fddf7b..3b072bc2 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -354,18 +354,344 @@ class DifyServiceAPIRunner(runner.RequestRunner): yield msg + + async def _chat_messages_chunk(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.MessageChunk, None]: + """调用聊天助手""" + cov_id = query.session.using_conversation.uuid or '' + query.variables['conversation_id'] = cov_id + + plain_text, image_ids = await self._preprocess_user_message(query) + + files = [ + { + 'type': 'image', + 'transfer_method': 'local_file', + 'upload_file_id': image_id, + } + for image_id in image_ids + ] + + mode = 'basic' # 标记是基础编排还是工作流编排 + + basic_mode_pending_chunk = '' + + inputs = {} + + inputs.update(query.variables) + message_idx = 0 + + chunk = None # 初始化chunk变量,防止在没有响应时引用错误 + + is_final = False + think_start = False + think_end = False + + remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') + + async for chunk in self.dify_client.chat_messages( + inputs=inputs, + query=plain_text, + user=f'{query.session.launcher_type.value}_{query.session.launcher_id}', + conversation_id=cov_id, + files=files, + timeout=120, + ): + self.ap.logger.debug('dify-chat-chunk: ' + str(chunk)) + + # if chunk['event'] == 'workflow_started': + # mode = 'workflow' + # if mode == 'workflow': + # elif mode == 'basic': + # 因为都只是返回的 message也没有工具调用什么的,暂时不分类 + if chunk['event'] == 'message': + message_idx += 1 + if remove_think: + if message_idx == 1: + think_start = True + continue + if '' in chunk['answer'] and not think_end: + import re + content = re.sub(r'^\n', '', chunk['answer']) + basic_mode_pending_chunk += content + think_end = True + elif think_end: + basic_mode_pending_chunk += chunk['answer'] + if think_start: + continue + + else: + basic_mode_pending_chunk += chunk['answer'] + + if chunk['event'] == 'message_end': + is_final = True + + if is_final or message_idx % 8 == 0: + # content, _ = self._process_thinking_content(basic_mode_pending_chunk) + yield llm_entities.MessageChunk( + role='assistant', + content=basic_mode_pending_chunk, + is_final=is_final, + ) + + + if chunk is None: + raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置') + + query.session.using_conversation.uuid = chunk['conversation_id'] + + + async def _agent_chat_messages_chunk( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: + """调用聊天助手""" + cov_id = query.session.using_conversation.uuid or '' + query.variables['conversation_id'] = cov_id + + plain_text, image_ids = await self._preprocess_user_message(query) + + files = [ + { + 'type': 'image', + 'transfer_method': 'local_file', + 'upload_file_id': image_id, + } + for image_id in image_ids + ] + + ignored_events = [] + + inputs = {} + + inputs.update(query.variables) + + pending_agent_message = '' + + chunk = None # 初始化chunk变量,防止在没有响应时引用错误 + message_idx = 0 + is_final = False + think_start = False + think_end = False + + remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') + + async for chunk in self.dify_client.chat_messages( + inputs=inputs, + query=plain_text, + user=f'{query.session.launcher_type.value}_{query.session.launcher_id}', + response_mode='streaming', + conversation_id=cov_id, + files=files, + timeout=120, + ): + self.ap.logger.debug('dify-agent-chunk: ' + str(chunk)) + + if chunk['event'] in ignored_events: + continue + + if chunk['event'] == 'agent_message': + message_idx += 1 + if remove_think: + if '' in chunk['answer'] and not think_start: + think_start = True + continue + if '' in chunk['answer'] and not think_end: + import re + content = re.sub(r'^\n', '', chunk['answer']) + pending_agent_message += content + think_end = True + elif think_end: + pending_agent_message += chunk['answer'] + if think_start: + continue + + else: + pending_agent_message += chunk['answer'] + elif chunk['event'] == 'message_end': + is_final = True + else: + + if chunk['event'] == 'agent_thought': + if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过 + continue + message_idx += 1 + if chunk['tool']: + msg = llm_entities.MessageChunk( + role='assistant', + tool_calls=[ + llm_entities.ToolCall( + id=chunk['id'], + type='function', + function=llm_entities.FunctionCall( + name=chunk['tool'], + arguments=json.dumps({}), + ), + ) + ], + ) + yield msg + if chunk['event'] == 'message_file': + message_idx += 1 + if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant': + base_url = self.dify_client.base_url + + if base_url.endswith('/v1'): + base_url = base_url[:-3] + + image_url = base_url + chunk['url'] + + yield llm_entities.MessageChunk( + role='assistant', + content=[llm_entities.ContentElement.from_image_url(image_url)], + is_final=is_final, + + ) + + if chunk['event'] == 'error': + raise errors.DifyAPIError('dify 服务错误: ' + chunk['message']) + if message_idx % 8 == 0 or is_final: + yield llm_entities.MessageChunk( + role='assistant', + content=pending_agent_message, + is_final=is_final, + ) + + if chunk is None: + raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置') + + query.session.using_conversation.uuid = chunk['conversation_id'] + + async def _workflow_messages_chunk(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + """调用工作流""" + + if not query.session.using_conversation.uuid: + query.session.using_conversation.uuid = str(uuid.uuid4()) + + query.variables['conversation_id'] = query.session.using_conversation.uuid + + plain_text, image_ids = await self._preprocess_user_message(query) + + files = [ + { + 'type': 'image', + 'transfer_method': 'local_file', + 'upload_file_id': image_id, + } + for image_id in image_ids + ] + + ignored_events = ['workflow_started'] + + inputs = { # these variables are legacy variables, we need to keep them for compatibility + 'langbot_user_message_text': plain_text, + 'langbot_session_id': query.variables['session_id'], + 'langbot_conversation_id': query.variables['conversation_id'], + 'langbot_msg_create_time': query.variables['msg_create_time'], + } + + inputs.update(query.variables) + messsage_idx = 0 + is_final = False + think_start = False + think_end = False + workflow_contents = '' + + remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') + async for chunk in self.dify_client.workflow_run( + inputs=inputs, + user=f'{query.session.launcher_type.value}_{query.session.launcher_id}', + files=files, + timeout=120, + ): + self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk)) + if chunk['event'] in ignored_events: + continue + if chunk['event'] == 'workflow_finished': + is_final = True + if chunk['data']['error']: + raise errors.DifyAPIError(chunk['data']['error']) + + if chunk['event'] == 'text_chunk': + messsage_idx += 1 + if remove_think: + if '' in chunk['data']['text'] and not think_start: + think_start = True + continue + if '' in chunk['data']['text'] and not think_end: + import re + content = re.sub(r'^\n', '', chunk['data']['text']) + workflow_contents += content + think_end = True + elif think_end: + workflow_contents += chunk['data']['text'] + if think_start: + continue + + else: + workflow_contents += chunk['data']['text'] + + if chunk['event'] == 'node_started': + if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': + continue + messsage_idx += 1 + msg = llm_entities.MessageChunk( + role='assistant', + content=None, + tool_calls=[ + llm_entities.ToolCall( + id=chunk['data']['node_id'], + type='function', + function=llm_entities.FunctionCall( + name=chunk['data']['title'], + arguments=json.dumps({}), + ), + ) + ], + ) + + yield msg + + + if messsage_idx % 8 == 0 or is_final: + yield llm_entities.MessageChunk( + role='assistant', + content=workflow_contents, + is_final=is_final, + ) + async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: """运行请求""" - if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat': - async for msg in self._chat_messages(query): - yield msg - elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent': - async for msg in self._agent_chat_messages(query): - yield msg - elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow': - async for msg in self._workflow_messages(query): - yield msg + if await query.adapter.is_stream_output_supported(): + msg_idx = 0 + if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat': + async for msg in self._chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent': + async for msg in self._agent_chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow': + async for msg in self._workflow_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + else: + raise errors.DifyAPIError( + f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}' + ) else: - raise errors.DifyAPIError( - f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}' - ) \ No newline at end of file + if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat': + async for msg in self._chat_messages(query): + yield msg + elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent': + async for msg in self._agent_chat_messages(query): + yield msg + elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow': + async for msg in self._workflow_messages(query): + yield msg + else: + raise errors.DifyAPIError( + f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}' + ) \ No newline at end of file