From 8f8c8ff367499329c348b321a5cf4edc328017a9 Mon Sep 17 00:00:00 2001 From: Dong_master <2213070223@qq.com> Date: Mon, 21 Jul 2025 18:45:45 +0800 Subject: [PATCH] feat:add dashscopeapi stream fix:dify 64chunk yield --- pkg/provider/runners/dashscopeapi.py | 198 ++++++++++++++++++++------- pkg/provider/runners/difysvapi.py | 2 +- 2 files changed, 147 insertions(+), 53 deletions(-) diff --git a/pkg/provider/runners/dashscopeapi.py b/pkg/provider/runners/dashscopeapi.py index 02cb0b51..fe72b0a8 100644 --- a/pkg/provider/runners/dashscopeapi.py +++ b/pkg/provider/runners/dashscopeapi.py @@ -113,39 +113,84 @@ class DashScopeAPIRunner(runner.RequestRunner): # "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个 # } ) + idx_chunk = 0 + try: + # print(await query.adapter.is_stream_output_supported()) + is_stream = await query.adapter.is_stream_output_supported() - for chunk in response: - if chunk.get('status_code') != 200: - raise DashscopeAPIError( - f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' - ) - if not chunk: - continue + except AttributeError: + is_stream = False + if is_stream: + for chunk in response: + if chunk.get('status_code') != 200: + raise DashscopeAPIError( + f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' + ) + if not chunk: + continue + idx_chunk += 1 + # 获取流式传输的output + stream_output = chunk.get('output', {}) + if stream_output.get('text') is not None: + pending_content += stream_output.get('text') + # 是否是流式最后一个chunk + is_final = False if stream_output.get('finish_reason', False) == 'null' else True - # 获取流式传输的output - stream_output = chunk.get('output', {}) - if stream_output.get('text') is not None: - pending_content += stream_output.get('text') + # 获取模型传出的参考资料列表 + references_dict_list = stream_output.get('doc_references', []) - # 保存当前会话的session_id用于下次对话的语境 - query.session.using_conversation.uuid = stream_output.get('session_id') + # 从模型传出的参考资料信息中提取用于替换的字典 + if references_dict_list is not None: + for doc in references_dict_list: + if doc.get('index_id') is not None: + references_dict[doc.get('index_id')] = doc.get('doc_name') - # 获取模型传出的参考资料列表 - references_dict_list = stream_output.get('doc_references', []) + # 将参考资料替换到文本中 + pending_content = self._replace_references(pending_content, references_dict) - # 从模型传出的参考资料信息中提取用于替换的字典 - if references_dict_list is not None: - for doc in references_dict_list: - if doc.get('index_id') is not None: - references_dict[doc.get('index_id')] = doc.get('doc_name') + if idx_chunk % 64 == 0 or is_final: + yield llm_entities.MessageChunk( + role='assistant', + content=pending_content, + is_final=is_final, + ) + # 保存当前会话的session_id用于下次对话的语境 + query.session.using_conversation.uuid = stream_output.get('session_id') + else: + for chunk in response: + if chunk.get('status_code') != 200: + raise DashscopeAPIError( + f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' + ) + if not chunk: + continue + idx_chunk += 1 + # 获取流式传输的output + stream_output = chunk.get('output', {}) + if stream_output.get('text') is not None: + pending_content += stream_output.get('text') - # 将参考资料替换到文本中 - pending_content = self._replace_references(pending_content, references_dict) + # 保存当前会话的session_id用于下次对话的语境 + query.session.using_conversation.uuid = stream_output.get('session_id') - yield llm_entities.Message( - role='assistant', - content=pending_content, - ) + # 获取模型传出的参考资料列表 + references_dict_list = stream_output.get('doc_references', []) + + # 从模型传出的参考资料信息中提取用于替换的字典 + if references_dict_list is not None: + for doc in references_dict_list: + if doc.get('index_id') is not None: + references_dict[doc.get('index_id')] = doc.get('doc_name') + + # 将参考资料替换到文本中 + pending_content = self._replace_references(pending_content, references_dict) + + + + yield llm_entities.Message( + role='assistant', + content=pending_content, + ) async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: """Dashscope 工作流对话请求""" @@ -177,38 +222,87 @@ class DashScopeAPIRunner(runner.RequestRunner): ) # 处理API返回的流式输出 - for chunk in response: - if chunk.get('status_code') != 200: - raise DashscopeAPIError( - f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' - ) - if not chunk: - continue + try: + # print(await query.adapter.is_stream_output_supported()) + is_stream = await query.adapter.is_stream_output_supported() - # 获取流式传输的output - stream_output = chunk.get('output', {}) - if stream_output.get('text') is not None: - pending_content += stream_output.get('text') + except AttributeError: + is_stream = False + idx_chunk = 0 + if is_stream: + for chunk in response: + if chunk.get('status_code') != 200: + raise DashscopeAPIError( + f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' + ) + if not chunk: + continue + idx_chunk += 1 + # 获取流式传输的output + stream_output = chunk.get('output', {}) + if stream_output.get('text') is not None: + pending_content += stream_output.get('text') - # 保存当前会话的session_id用于下次对话的语境 - query.session.using_conversation.uuid = stream_output.get('session_id') + is_final = False if stream_output.get('finish_reason', False) == 'null' else True - # 获取模型传出的参考资料列表 - references_dict_list = stream_output.get('doc_references', []) + # 获取模型传出的参考资料列表 + references_dict_list = stream_output.get('doc_references', []) - # 从模型传出的参考资料信息中提取用于替换的字典 - if references_dict_list is not None: - for doc in references_dict_list: - if doc.get('index_id') is not None: - references_dict[doc.get('index_id')] = doc.get('doc_name') + # 从模型传出的参考资料信息中提取用于替换的字典 + if references_dict_list is not None: + for doc in references_dict_list: + if doc.get('index_id') is not None: + references_dict[doc.get('index_id')] = doc.get('doc_name') - # 将参考资料替换到文本中 - pending_content = self._replace_references(pending_content, references_dict) + # 将参考资料替换到文本中 + pending_content = self._replace_references(pending_content, references_dict) + if is_final: + yield llm_entities.MessageChunk( + role='assistant', + content=pending_content, + is_final=is_final, - yield llm_entities.Message( - role='assistant', - content=pending_content, - ) + ) + + # 保存当前会话的session_id用于下次对话的语境 + query.session.using_conversation.uuid = stream_output.get('session_id') + + + else: + for chunk in response: + if chunk.get('status_code') != 200: + raise DashscopeAPIError( + f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' + ) + if not chunk: + continue + + # 获取流式传输的output + stream_output = chunk.get('output', {}) + if stream_output.get('text') is not None: + pending_content += stream_output.get('text') + + is_final = False if stream_output.get('finish_reason', False) == 'null' else True + + # 保存当前会话的session_id用于下次对话的语境 + query.session.using_conversation.uuid = stream_output.get('session_id') + + # 获取模型传出的参考资料列表 + references_dict_list = stream_output.get('doc_references', []) + + # 从模型传出的参考资料信息中提取用于替换的字典 + if references_dict_list is not None: + for doc in references_dict_list: + if doc.get('index_id') is not None: + references_dict[doc.get('index_id')] = doc.get('doc_name') + + # 将参考资料替换到文本中 + pending_content = self._replace_references(pending_content, references_dict) + + yield llm_entities.Message( + role='assistant', + content=pending_content, + ) async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: """运行""" diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index f0c36ca1..7c7d81ad 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -259,7 +259,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): is_final = False pending_agent_message += chunk['answer'] if is_stream: - if batch_pending_index % 32 == 0 or is_final: + if batch_pending_index % 64 == 0 or is_final: yield llm_entities.MessageChunk( role='assistant', content=self._try_convert_thinking(pending_agent_message),