From eede354d3b8859944eb1e6b20bccce9355adbcc3 Mon Sep 17 00:00:00 2001
From: Dong_master <2213070223@qq.com>
Date: Sat, 9 Aug 2025 02:46:13 +0800
Subject: [PATCH] fix:chatcmpl.py del content ,in the ppiochatcmpl.py
and modelsopechatcmpl.py fun _closure_stream stream logic
---
pkg/provider/modelmgr/requesters/chatcmpl.py | 10 +-
.../modelmgr/requesters/modelscopechatcmpl.py | 137 ++++++++++++------
.../modelmgr/requesters/ppiochatcmpl.py | 102 ++++++++-----
3 files changed, 159 insertions(+), 90 deletions(-)
diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py
index f8ea8593..adeaa251 100644
--- a/pkg/provider/modelmgr/requesters/chatcmpl.py
+++ b/pkg/provider/modelmgr/requesters/chatcmpl.py
@@ -201,11 +201,11 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
delta_content = '\n\n' + delta_content
# 处理 content 中已有的 标签(如果需要移除)
- if delta_content and remove_think and '' in delta_content:
- import re
-
- # 移除 标签及其内容
- delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL)
+ # if delta_content and remove_think and '' in delta_content:
+ # import re
+ #
+ # # 移除 标签及其内容
+ # delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL)
# 处理工具调用增量
delta_tool_calls = None
diff --git a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py
index e02b0d07..0007623e 100644
--- a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py
+++ b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py
@@ -241,61 +241,106 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester):
del me['image_base64']
args['messages'] = messages
-
- current_content = ''
args['stream'] = True
- chunk_idx = 0
- is_content = False
- is_think = False
+
+
+ # 流式处理状态
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
+ chunk_idx = 0
+ thinking_started = False
+ thinking_ended = False
+ role = 'assistant' # 默认角色
+ accumulated_reasoning = '' # 仅用于判断何时结束思维链
+
async for chunk in self._req_stream(args, extra_body=extra_args):
- if hasattr(chunk, 'choices'):
- # 完整响应模式
+ # 解析 chunk 数据
+ if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
- delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
+ delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
+ finish_reason = getattr(choice, 'finish_reason', None)
else:
- # 流式chunk模式
- delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
- reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
- delta['reasoning_content'] = None if reasoning_content == '' else reasoning_content # 直接不管有没有思考消息,构造一个,方便去除思考判断
- if remove_think:
- if delta['reasoning_content'] is not None:
+ delta = {}
+ finish_reason = None
+
+ # 从第一个 chunk 获取 role,后续使用这个 role
+ if 'role' in delta and delta['role']:
+ role = delta['role']
+
+ # 获取增量内容
+ delta_content = delta.get('content', '')
+ reasoning_content = delta.get('reasoning_content', '')
+
+ # 处理 reasoning_content
+ if reasoning_content:
+ accumulated_reasoning += reasoning_content
+ # 如果设置了 remove_think,跳过 reasoning_content
+ if remove_think:
+ chunk_idx += 1
continue
- if ((delta['content'] == '' or delta.get('content', None) is None) and
- (delta.get('reasoning_content', None) is None or delta['reasoning_content'] == '') and
- chunk_idx == 0): # 此处将第一条空消息排除,大部分模型第一条消息携带的是role,但是在role直接处理为ass
+
+ # 第一次出现 reasoning_content,添加 开始标签
+ if not thinking_started:
+ thinking_started = True
+ delta_content = '\n' + reasoning_content
+ else:
+ # 继续输出 reasoning_content
+ delta_content = reasoning_content
+ elif thinking_started and not thinking_ended and delta_content:
+ # reasoning_content 结束,normal content 开始,添加 结束标签
+ thinking_ended = True
+ delta_content = '\n\n' + delta_content
+
+ # 处理 content 中已有的 标签(如果需要移除)
+ # if delta_content and remove_think and '' in delta_content:
+ # import re
+ #
+ # # 移除 标签及其内容
+ # delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL)
+
+ # 处理工具调用增量
+ delta_tool_calls = None
+ if delta.get('tool_calls'):
+ delta_tool_calls = []
+ for tool_call in delta['tool_calls']:
+ tc_id = tool_call.get('id')
+ if tc_id:
+ if tc_id not in tool_calls_map:
+ # 新的工具调用
+ tool_calls_map[tc_id] = llm_entities.ToolCall(
+ id=tc_id,
+ type=tool_call.get('type', 'function'),
+ function=llm_entities.FunctionCall(
+ name=tool_call.get('function', {}).get('name', ''),
+ arguments=tool_call.get('function', {}).get('arguments', ''),
+ ),
+ )
+ delta_tool_calls.append(tool_calls_map[tc_id])
+ else:
+ # 追加函数参数
+ func_args = tool_call.get('function', {}).get('arguments', '')
+ if func_args:
+ tool_calls_map[tc_id].function.arguments += func_args
+ # 返回更新后的完整工具调用
+ delta_tool_calls.append(tool_calls_map[tc_id])
+
+ # 跳过空的第一个 chunk(只有 role 没有内容)
+ if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
+ chunk_idx += 1
continue
- # 处理流式消息
- delta_message, is_content, is_think = await self._make_msg_chunk(delta,
- chunk_idx,
- is_content,
- is_think)
- # 处理流式消息
- if delta_message.content:
- current_content += delta_message.content
- delta_message.content = current_content
- # delta_message.all_content = current_content
- if delta_message.tool_calls:
- for tool_call in delta_message.tool_calls:
- if tool_call.id not in tool_calls_map:
- tool_calls_map[tool_call.id] = llm_entities.ToolCall(
- id=tool_call.id,
- type=tool_call.type,
- function=llm_entities.FunctionCall(
- name=tool_call.function.name if tool_call.function else '', arguments=''
- ),
- )
- if tool_call.function and tool_call.function.arguments:
- # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖
- tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
+ # 构建 MessageChunk - 只包含增量内容
+ chunk_data = {
+ 'role': role,
+ 'content': delta_content if delta_content else None,
+ 'tool_calls': delta_tool_calls if delta_tool_calls else None,
+ 'is_final': bool(finish_reason),
+ }
+
+ # 移除 None 值
+ chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
+
+ yield llm_entities.MessageChunk(**chunk_data)
chunk_idx += 1
- chunk_choices = getattr(chunk, 'choices', None)
- if chunk_choices and getattr(chunk_choices[0], 'finish_reason', None):
- delta_message.is_final = True
- delta_message.content = current_content
-
- yield delta_message
# return
async def invoke_llm(
diff --git a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py
index 68acae81..49f03143 100644
--- a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py
+++ b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py
@@ -112,24 +112,32 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
del me['image_base64']
args['messages'] = messages
-
- current_content = ''
args['stream'] = True
- chunk_idx = 0
- is_think = False
+
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
+ chunk_idx = 0
+ thinking_started = False
+ thinking_ended = False
+ role = 'assistant' # 默认角色
+ accumulated_reasoning = '' # 仅用于判断何时结束思维链
async for chunk in self._req_stream(args, extra_body=extra_args):
- # 处理流式消息
- if hasattr(chunk, 'choices'):
- # 完整响应模式
- if chunk.choices:
- choice = chunk.choices[0]
- delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
- else:
- continue
+ # 解析 chunk 数据
+ if hasattr(chunk, 'choices') and chunk.choices:
+ choice = chunk.choices[0]
+ delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
+ finish_reason = getattr(choice, 'finish_reason', None)
else:
- # 流式chunk模式
- delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
+ delta = {}
+ finish_reason = None
+
+ # 从第一个 chunk 获取 role,后续使用这个 role
+ if 'role' in delta and delta['role']:
+ role = delta['role']
+
+ # 获取增量内容
+ delta_content = delta.get('content', '')
+ # reasoning_content = delta.get('reasoning_content', '')
+
if remove_think:
if delta['content'] is not None:
if '' in delta['content']:
@@ -141,30 +149,46 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
elif is_think or delta['content'] == '\n\n':
continue
- delta_message = await self._make_msg_chunk(delta, chunk_idx)
- # 处理流式消息
- if delta_message.content:
- current_content += delta_message.content
- delta_message.content = current_content
- # delta_message.all_content = current_content
- if delta_message.tool_calls:
- for tool_call in delta_message.tool_calls:
- if tool_call.id not in tool_calls_map:
- tool_calls_map[tool_call.id] = llm_entities.ToolCall(
- id=tool_call.id,
- type=tool_call.type,
- function=llm_entities.FunctionCall(
- name=tool_call.function.name if tool_call.function else '', arguments=''
- ),
- )
- if tool_call.function and tool_call.function.arguments:
- # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖
- tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
+ delta_tool_calls = None
+ if delta.get('tool_calls'):
+ delta_tool_calls = []
+ for tool_call in delta['tool_calls']:
+ tc_id = tool_call.get('id')
+ if tc_id:
+ if tc_id not in tool_calls_map:
+ # 新的工具调用
+ tool_calls_map[tc_id] = llm_entities.ToolCall(
+ id=tc_id,
+ type=tool_call.get('type', 'function'),
+ function=llm_entities.FunctionCall(
+ name=tool_call.get('function', {}).get('name', ''),
+ arguments=tool_call.get('function', {}).get('arguments', ''),
+ ),
+ )
+ delta_tool_calls.append(tool_calls_map[tc_id])
+ else:
+ # 追加函数参数
+ func_args = tool_call.get('function', {}).get('arguments', '')
+ if func_args:
+ tool_calls_map[tc_id].function.arguments += func_args
+ # 返回更新后的完整工具调用
+ delta_tool_calls.append(tool_calls_map[tc_id])
+ # 跳过空的第一个 chunk(只有 role 没有内容)
+ if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'):
+ chunk_idx += 1
+ continue
+
+ # 构建 MessageChunk - 只包含增量内容
+ chunk_data = {
+ 'role': role,
+ 'content': delta_content if delta_content else None,
+ 'tool_calls': delta_tool_calls if delta_tool_calls else None,
+ 'is_final': bool(finish_reason),
+ }
+
+ # 移除 None 值
+ chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
+
+ yield llm_entities.MessageChunk(**chunk_data)
chunk_idx += 1
- chunk_choices = getattr(chunk, 'choices', None)
- if chunk_choices and getattr(chunk_choices[0], 'finish_reason', None):
- delta_message.is_final = True
- delta_message.content = current_content
-
- yield delta_message