feat:add dashscopeapi stream

fix:dify 64chunk yield
This commit is contained in:
Dong_master
2025-07-21 18:45:45 +08:00
committed by Junyan Qin
parent 7728b4262b
commit 074d359c8e
2 changed files with 147 additions and 53 deletions
+146 -52
View File
@@ -113,39 +113,84 @@ class DashScopeAPIRunner(runner.RequestRunner):
# "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个 # "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个
# } # }
) )
idx_chunk = 0
try:
# print(await query.adapter.is_stream_output_supported())
is_stream = await query.adapter.is_stream_output_supported()
for chunk in response: except AttributeError:
if chunk.get('status_code') != 200: is_stream = False
raise DashscopeAPIError( if is_stream:
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} ' for chunk in response:
) if chunk.get('status_code') != 200:
if not chunk: raise DashscopeAPIError(
continue f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 是否是流式最后一个chunk
is_final = False if stream_output.get('finish_reason', False) == 'null' else True
# 获取流式传输的output # 获取模型传出的参考资料列表
stream_output = chunk.get('output', {}) references_dict_list = stream_output.get('doc_references', [])
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 保存当前会话的session_id用于下次对话的语境 # 从模型传出的参考资料信息中提取用于替换的字典
query.session.using_conversation.uuid = stream_output.get('session_id') if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 获取模型传出的参考资料列表 # 将参考资料替换到文本中
references_dict_list = stream_output.get('doc_references', []) pending_content = self._replace_references(pending_content, references_dict)
# 从模型传出的参考资料信息中提取用于替换的字典 if idx_chunk % 64 == 0 or is_final:
if references_dict_list is not None: yield llm_entities.MessageChunk(
for doc in references_dict_list: role='assistant',
if doc.get('index_id') is not None: content=pending_content,
references_dict[doc.get('index_id')] = doc.get('doc_name') is_final=is_final,
)
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
else:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 将参考资料替换到文本中 # 保存当前会话的session_id用于下次对话的语境
pending_content = self._replace_references(pending_content, references_dict) query.session.using_conversation.uuid = stream_output.get('session_id')
yield llm_entities.Message( # 获取模型传出的参考资料列表
role='assistant', references_dict_list = stream_output.get('doc_references', [])
content=pending_content,
) # 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
yield llm_entities.Message(
role='assistant',
content=pending_content,
)
async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""Dashscope 工作流对话请求""" """Dashscope 工作流对话请求"""
@@ -177,38 +222,87 @@ class DashScopeAPIRunner(runner.RequestRunner):
) )
# 处理API返回的流式输出 # 处理API返回的流式输出
for chunk in response: try:
if chunk.get('status_code') != 200: # print(await query.adapter.is_stream_output_supported())
raise DashscopeAPIError( is_stream = await query.adapter.is_stream_output_supported()
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
# 获取流式传输的output except AttributeError:
stream_output = chunk.get('output', {}) is_stream = False
if stream_output.get('text') is not None: idx_chunk = 0
pending_content += stream_output.get('text') if is_stream:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 保存当前会话的session_id用于下次对话的语境 is_final = False if stream_output.get('finish_reason', False) == 'null' else True
query.session.using_conversation.uuid = stream_output.get('session_id')
# 获取模型传出的参考资料列表 # 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', []) references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典 # 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None: if references_dict_list is not None:
for doc in references_dict_list: for doc in references_dict_list:
if doc.get('index_id') is not None: if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name') references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中 # 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict) pending_content = self._replace_references(pending_content, references_dict)
if is_final:
yield llm_entities.MessageChunk(
role='assistant',
content=pending_content,
is_final=is_final,
yield llm_entities.Message( )
role='assistant',
content=pending_content, # 保存当前会话的session_id用于下次对话的语境
) query.session.using_conversation.uuid = stream_output.get('session_id')
else:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
is_final = False if stream_output.get('finish_reason', False) == 'null' else True
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
# 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
yield llm_entities.Message(
role='assistant',
content=pending_content,
)
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""运行""" """运行"""
+1 -1
View File
@@ -259,7 +259,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
is_final = False is_final = False
pending_agent_message += chunk['answer'] pending_agent_message += chunk['answer']
if is_stream: if is_stream:
if batch_pending_index % 32 == 0 or is_final: if batch_pending_index % 64 == 0 or is_final:
yield llm_entities.MessageChunk( yield llm_entities.MessageChunk(
role='assistant', role='assistant',
content=self._try_convert_thinking(pending_agent_message), content=self._try_convert_thinking(pending_agent_message),