fix:The handling logic of remove_think in the connector and Temporarily blocked the processing of streaming tool calls in the runner.

This commit is contained in:
Dong_master
2025-08-05 04:24:03 +08:00
parent 5597dffaeb
commit e88302f1b4
5 changed files with 145 additions and 129 deletions

View File

@@ -17,7 +17,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
"""OpenAI ChatCompletion API 请求器"""
client: openai.AsyncClient
is_content: bool
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.openai.com/v1',
@@ -31,7 +30,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
timeout=self.requester_cfg['timeout'],
http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']),
)
self.is_content = False
async def _req(
self,
@@ -76,22 +74,14 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
async def _make_msg_chunk(
self,
remove_think: bool,
chat_completion: chat_completion.ChatCompletion,
delta: dict[str, typing.Any],
idx: int,
is_content: bool,
is_think: bool,
) -> llm_entities.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
if hasattr(chat_completion, 'choices'):
# 完整响应模式
choice = chat_completion.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {}
# 确保 role 字段存在且不为 None
# print(delta.keys(),delta.values())
if 'role' not in delta or delta['role'] is None:
delta['role'] = 'assistant'
@@ -101,26 +91,23 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
# print(reasoning_content)
# deepseek的reasoner模型
if remove_think:
if reasoning_content is not None:
pass
else:
delta['content'] = delta['content']
else:
if reasoning_content is not None and idx == 0:
if reasoning_content is not None and idx == 0:
if reasoning_content != '':
delta['content'] += f'<think>\n{reasoning_content}'
elif reasoning_content is None:
if self.is_content:
delta['content'] = delta['content']
else:
delta['content'] = f'\n<think>\n\n{delta["content"]}'
self.is_content = True
else:
delta['content'] += reasoning_content
is_think = True
elif reasoning_content is None and idx != 0:
if is_content:
delta['content'] = delta['content']
elif is_think:
delta['content'] = f'\n<think>\n\n{delta["content"]}'
is_content = True
is_think = False
else:
delta['content'] = reasoning_content
message = llm_entities.MessageChunk(**delta)
return message
return message,is_content, is_think
async def _closure_stream(
self,
@@ -159,11 +146,26 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
current_content = ''
args['stream'] = True
chunk_idx = 0
self.is_content = False
is_content = False
is_think = False
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
async for chunk in self._req_stream(args, extra_body=extra_args):
if hasattr(chunk, 'choices'):
# 完整响应模式
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
if remove_think:
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
if reasoning_content is not None:
continue
# 处理流式消息
delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx)
delta_message,is_content,is_think = await self._make_msg_chunk(delta,
chunk_idx,
is_content,
is_think)
if delta_message.content:
current_content += delta_message.content
delta_message.content = current_content

View File

@@ -19,7 +19,6 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions):
'base_url': 'https://ai.gitee.com/v1',
'timeout': 120,
}
is_think: bool = False
async def _closure(
self,
@@ -86,19 +85,12 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions):
async def _make_msg_chunk(
self,
remove_think: bool,
chat_completion: chat_completion.ChatCompletion,
delta: dict[str, typing.Any],
idx: int,
) -> llm_entities.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
if hasattr(chat_completion, 'choices'):
# 完整响应模式
choice = chat_completion.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {}
# 确保 role 字段存在且不为 None
if 'role' not in delta or delta['role'] is None:
@@ -110,20 +102,9 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions):
# print(reasoning_content)
# deepseek的reasoner模型
if remove_think:
if delta['content'] == '<think>':
self.is_think = True
delta['content'] = ''
if delta['content'] == r'</think>':
self.is_think = False
delta['content'] = ''
if not self.is_think:
delta['content'] = delta['content']
else:
delta['content'] = ''
else:
if reasoning_content is not None:
delta['content'] += reasoning_content
if reasoning_content is not None:
delta['content'] += reasoning_content
message = llm_entities.MessageChunk(**delta)
@@ -166,11 +147,32 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions):
current_content = ''
args['stream'] = True
chunk_idx = 0
self.is_content = False
is_think = False
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
async for chunk in self._req_stream(args, extra_body=extra_args):
# 处理流式消息
delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx)
if hasattr(chunk, 'choices'):
# 完整响应模式
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
continue
else:
# 流式chunk模式
delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
if remove_think:
print(delta)
if delta['content'] == '<think>':
is_think = True
continue
elif delta['content'] == r'</think>':
is_think = False
continue
elif is_think or delta['content'] == '\n\n':
continue
delta_message = await self._make_msg_chunk(delta, chunk_idx)
if delta_message.content:
current_content += delta_message.content
delta_message.content = current_content

View File

@@ -172,24 +172,15 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester):
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
yield chunk
async def _make_msg_chunk(
self,
remove_think: bool,
chat_completion: chat_completion.ChatCompletion,
idx: int,
async def _make_msg_chunk(self,
delta: dict[str, typing.Any],
idx: int,
is_content: bool,
is_think: bool,
) -> llm_entities.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
if hasattr(chat_completion, 'choices'):
# 完整响应模式
choice = chat_completion.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {}
# 确保 role 字段存在且不为 None
# print(delta.keys(),delta.values())
if 'role' not in delta or delta['role'] is None:
delta['role'] = 'assistant'
@@ -199,26 +190,23 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester):
# print(reasoning_content)
# deepseek的reasoner模型
if remove_think:
if reasoning_content is not None:
pass
else:
delta['content'] = delta['content']
else:
if reasoning_content is not None and idx == 0:
if reasoning_content is not None and idx == 0:
if reasoning_content != '':
delta['content'] += f'<think>\n{reasoning_content}'
elif reasoning_content is None:
if self.is_content:
delta['content'] = delta['content']
else:
delta['content'] = f'\n<think>\n\n{delta["content"]}'
self.is_content = True
else:
delta['content'] += reasoning_content
is_think = True
elif reasoning_content == '' and idx != 0:
if is_content:
delta['content'] = delta['content']
elif is_think:
delta['content'] = f'\n<think>\n\n{delta["content"]}'
is_content = True
is_think = False
else:
delta['content'] = reasoning_content
message = llm_entities.MessageChunk(**delta)
return message
return message, is_content, is_think
async def _closure_stream(
self,
@@ -257,11 +245,28 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester):
current_content = ''
args['stream'] = True
chunk_idx = 0
self.is_content = False
is_content = False
is_think = False
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
async for chunk in self._req_stream(args, extra_body=extra_args):
if hasattr(chunk, 'choices'):
# 完整响应模式
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
print(delta)
if remove_think:
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
if reasoning_content != '':
continue
# 处理流式消息
delta_message, is_content, is_think = await self._make_msg_chunk(delta,
chunk_idx,
is_content,
is_think)
# 处理流式消息
delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx)
if delta_message.content:
current_content += delta_message.content
delta_message.content = current_content

View File

@@ -54,20 +54,12 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
return message
async def _make_msg_chunk(
self,
remove_think: bool,
chat_completion: chat_completion.ChatCompletion,
idx: int,
self,
delta: dict[str, typing.Any],
idx: int,
) -> llm_entities.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
if hasattr(chat_completion, 'choices'):
# 完整响应模式
choice = chat_completion.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
# 流式chunk模式
delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {}
# 确保 role 字段存在且不为 None
if 'role' not in delta or delta['role'] is None:
@@ -79,20 +71,9 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
# print(reasoning_content)
# deepseek的reasoner模型
if remove_think:
if '<think>' in delta['content']:
self.is_think = True
delta['content'] = ''
if r'</think>' in delta['content']:
self.is_think = False
delta['content'] = ''
if not self.is_think:
delta['content'] = delta['content']
else:
delta['content'] = ''
else:
if reasoning_content is not None:
delta['content'] += reasoning_content
if reasoning_content is not None:
delta['content'] += reasoning_content
message = llm_entities.MessageChunk(**delta)
@@ -135,11 +116,33 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
current_content = ''
args['stream'] = True
chunk_idx = 0
self.is_content = False
is_think = False
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
async for chunk in self._req_stream(args, extra_body=extra_args):
# 处理流式消息
delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx)
if hasattr(chunk, 'choices'):
# 完整响应模式
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
else:
continue
else:
# 流式chunk模式
delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
if remove_think:
if delta['content'] is not None:
if '<think>' in delta['content']:
is_think = True
continue
elif delta['content'] == r'</think>':
is_think = False
continue
elif is_think or delta['content'] == '\n\n':
continue
delta_message = await self._make_msg_chunk(delta, chunk_idx)
# 处理流式消息
if delta_message.content:
current_content += delta_message.content
delta_message.content = current_content
@@ -164,5 +167,4 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
delta_message.is_final = True
delta_message.content = current_content
if chunk_idx % 64 == 0 or delta_message.is_final:
yield delta_message
yield delta_message

View File

@@ -122,10 +122,11 @@ class LocalAgentRunner(runner.RequestRunner):
remove_think=remove_think,
):
msg_idx = msg_idx + 1
tool_msg = msg
if msg_idx % 8 == 0 or msg.is_final:
yield msg
if msg.tool_calls:
for tool_call in msg.tool_calls:
if tool_msg.tool_calls:
for tool_call in tool_msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
@@ -137,9 +138,9 @@ class LocalAgentRunner(runner.RequestRunner):
if tool_call.function and tool_call.function.arguments:
# 流式处理中工具调用参数可能分多个chunk返回需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
final_msg = llm_entities.Message(
role=msg.role,
content=msg.all_content,
final_msg = llm_entities.MessageChunk(
role="tool",
content='',
tool_calls=list(tool_calls_map.values()),
)
@@ -176,6 +177,7 @@ class LocalAgentRunner(runner.RequestRunner):
if is_stream:
tool_calls_map = {}
msg_idx = 0
async for msg in await query.use_llm_model.requester.invoke_llm_stream(
query,
query.use_llm_model,
@@ -184,9 +186,12 @@ class LocalAgentRunner(runner.RequestRunner):
extra_args=query.use_llm_model.model_entity.extra_args,
remove_think=remove_think,
):
yield msg
if msg.tool_calls:
for tool_call in msg.tool_calls:
msg_idx += 1
tool_msg = msg
if msg_idx % 8 == 0 or msg.is_final:
yield msg
if tool_msg.tool_calls:
for tool_call in tool_msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
@@ -198,9 +203,9 @@ class LocalAgentRunner(runner.RequestRunner):
if tool_call.function and tool_call.function.arguments:
# 流式处理中工具调用参数可能分多个chunk返回需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
final_msg = llm_entities.Message(
role=msg.role,
content=msg.all_content,
final_msg = llm_entities.MessageChunk(
role="tool",
content='',
tool_calls=list(tool_calls_map.values()),
)
else: