diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py index cf557755..35d1ff92 100644 --- a/pkg/provider/modelmgr/requesters/chatcmpl.py +++ b/pkg/provider/modelmgr/requesters/chatcmpl.py @@ -17,7 +17,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): """OpenAI ChatCompletion API 请求器""" client: openai.AsyncClient - is_content: bool default_config: dict[str, typing.Any] = { 'base_url': 'https://api.openai.com/v1', @@ -31,7 +30,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): timeout=self.requester_cfg['timeout'], http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']), ) - self.is_content = False async def _req( self, @@ -76,22 +74,14 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): async def _make_msg_chunk( self, - remove_think: bool, - chat_completion: chat_completion.ChatCompletion, + delta: dict[str, typing.Any], idx: int, + is_content: bool, + is_think: bool, ) -> llm_entities.MessageChunk: # 处理流式chunk和完整响应的差异 # print(chat_completion.choices[0]) - if hasattr(chat_completion, 'choices'): - # 完整响应模式 - choice = chat_completion.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() - else: - # 流式chunk模式 - delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} - # 确保 role 字段存在且不为 None - # print(delta.keys(),delta.values()) if 'role' not in delta or delta['role'] is None: delta['role'] = 'assistant' @@ -101,26 +91,23 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): # print(reasoning_content) # deepseek的reasoner模型 - if remove_think: - if reasoning_content is not None: - pass - else: - delta['content'] = delta['content'] - else: - if reasoning_content is not None and idx == 0: + if reasoning_content is not None and idx == 0: + if reasoning_content != '': delta['content'] += f'\n{reasoning_content}' - elif reasoning_content is None: - if self.is_content: - delta['content'] = delta['content'] - else: - delta['content'] = f'\n\n\n{delta["content"]}' - self.is_content = True - else: - delta['content'] += reasoning_content + is_think = True + elif reasoning_content is None and idx != 0: + if is_content: + delta['content'] = delta['content'] + elif is_think: + delta['content'] = f'\n\n\n{delta["content"]}' + is_content = True + is_think = False + else: + delta['content'] = reasoning_content message = llm_entities.MessageChunk(**delta) - return message + return message,is_content, is_think async def _closure_stream( self, @@ -159,11 +146,26 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): current_content = '' args['stream'] = True chunk_idx = 0 - self.is_content = False + is_content = False + is_think = False tool_calls_map: dict[str, llm_entities.ToolCall] = {} async for chunk in self._req_stream(args, extra_body=extra_args): + if hasattr(chunk, 'choices'): + # 完整响应模式 + choice = chunk.choices[0] + delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() + else: + # 流式chunk模式 + delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {} + if remove_think: + reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None + if reasoning_content is not None: + continue # 处理流式消息 - delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx) + delta_message,is_content,is_think = await self._make_msg_chunk(delta, + chunk_idx, + is_content, + is_think) if delta_message.content: current_content += delta_message.content delta_message.content = current_content diff --git a/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py b/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py index a8d6eb16..0ff49798 100644 --- a/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py +++ b/pkg/provider/modelmgr/requesters/giteeaichatcmpl.py @@ -19,7 +19,6 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): 'base_url': 'https://ai.gitee.com/v1', 'timeout': 120, } - is_think: bool = False async def _closure( self, @@ -86,19 +85,12 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): async def _make_msg_chunk( self, - remove_think: bool, - chat_completion: chat_completion.ChatCompletion, + delta: dict[str, typing.Any], idx: int, ) -> llm_entities.MessageChunk: # 处理流式chunk和完整响应的差异 # print(chat_completion.choices[0]) - if hasattr(chat_completion, 'choices'): - # 完整响应模式 - choice = chat_completion.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() - else: - # 流式chunk模式 - delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} + # 确保 role 字段存在且不为 None if 'role' not in delta or delta['role'] is None: @@ -110,20 +102,9 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): # print(reasoning_content) # deepseek的reasoner模型 - if remove_think: - if delta['content'] == '': - self.is_think = True - delta['content'] = '' - if delta['content'] == r'': - self.is_think = False - delta['content'] = '' - if not self.is_think: - delta['content'] = delta['content'] - else: - delta['content'] = '' - else: - if reasoning_content is not None: - delta['content'] += reasoning_content + + if reasoning_content is not None: + delta['content'] += reasoning_content message = llm_entities.MessageChunk(**delta) @@ -166,11 +147,32 @@ class GiteeAIChatCompletions(chatcmpl.OpenAIChatCompletions): current_content = '' args['stream'] = True chunk_idx = 0 - self.is_content = False + is_think = False tool_calls_map: dict[str, llm_entities.ToolCall] = {} async for chunk in self._req_stream(args, extra_body=extra_args): # 处理流式消息 - delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx) + if hasattr(chunk, 'choices'): + # 完整响应模式 + if chunk.choices: + choice = chunk.choices[0] + delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() + else: + continue + else: + # 流式chunk模式 + delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {} + if remove_think: + print(delta) + if delta['content'] == '': + is_think = True + continue + elif delta['content'] == r'': + is_think = False + continue + elif is_think or delta['content'] == '\n\n': + continue + + delta_message = await self._make_msg_chunk(delta, chunk_idx) if delta_message.content: current_content += delta_message.content delta_message.content = current_content diff --git a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py index 7895a87e..f5db54a1 100644 --- a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py +++ b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py @@ -172,24 +172,15 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body): yield chunk - async def _make_msg_chunk( - self, - remove_think: bool, - chat_completion: chat_completion.ChatCompletion, - idx: int, + async def _make_msg_chunk(self, + delta: dict[str, typing.Any], + idx: int, + is_content: bool, + is_think: bool, ) -> llm_entities.MessageChunk: # 处理流式chunk和完整响应的差异 # print(chat_completion.choices[0]) - if hasattr(chat_completion, 'choices'): - # 完整响应模式 - choice = chat_completion.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() - else: - # 流式chunk模式 - delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} - # 确保 role 字段存在且不为 None - # print(delta.keys(),delta.values()) if 'role' not in delta or delta['role'] is None: delta['role'] = 'assistant' @@ -199,26 +190,23 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): # print(reasoning_content) # deepseek的reasoner模型 - if remove_think: - if reasoning_content is not None: - pass - else: - delta['content'] = delta['content'] - else: - if reasoning_content is not None and idx == 0: + if reasoning_content is not None and idx == 0: + if reasoning_content != '': delta['content'] += f'\n{reasoning_content}' - elif reasoning_content is None: - if self.is_content: - delta['content'] = delta['content'] - else: - delta['content'] = f'\n\n\n{delta["content"]}' - self.is_content = True - else: - delta['content'] += reasoning_content + is_think = True + elif reasoning_content == '' and idx != 0: + if is_content: + delta['content'] = delta['content'] + elif is_think: + delta['content'] = f'\n\n\n{delta["content"]}' + is_content = True + is_think = False + else: + delta['content'] = reasoning_content message = llm_entities.MessageChunk(**delta) - return message + return message, is_content, is_think async def _closure_stream( self, @@ -257,11 +245,28 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): current_content = '' args['stream'] = True chunk_idx = 0 - self.is_content = False + is_content = False + is_think = False tool_calls_map: dict[str, llm_entities.ToolCall] = {} async for chunk in self._req_stream(args, extra_body=extra_args): + if hasattr(chunk, 'choices'): + # 完整响应模式 + choice = chunk.choices[0] + delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() + else: + # 流式chunk模式 + delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {} + print(delta) + if remove_think: + reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None + if reasoning_content != '': + continue + # 处理流式消息 + delta_message, is_content, is_think = await self._make_msg_chunk(delta, + chunk_idx, + is_content, + is_think) # 处理流式消息 - delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx) if delta_message.content: current_content += delta_message.content delta_message.content = current_content diff --git a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py index ca49df10..68acae81 100644 --- a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py +++ b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py @@ -54,20 +54,12 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): return message async def _make_msg_chunk( - self, - remove_think: bool, - chat_completion: chat_completion.ChatCompletion, - idx: int, + self, + delta: dict[str, typing.Any], + idx: int, ) -> llm_entities.MessageChunk: # 处理流式chunk和完整响应的差异 # print(chat_completion.choices[0]) - if hasattr(chat_completion, 'choices'): - # 完整响应模式 - choice = chat_completion.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() - else: - # 流式chunk模式 - delta = chat_completion.delta.model_dump() if hasattr(chat_completion, 'delta') else {} # 确保 role 字段存在且不为 None if 'role' not in delta or delta['role'] is None: @@ -79,20 +71,9 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): # print(reasoning_content) # deepseek的reasoner模型 - if remove_think: - if '' in delta['content']: - self.is_think = True - delta['content'] = '' - if r'' in delta['content']: - self.is_think = False - delta['content'] = '' - if not self.is_think: - delta['content'] = delta['content'] - else: - delta['content'] = '' - else: - if reasoning_content is not None: - delta['content'] += reasoning_content + + if reasoning_content is not None: + delta['content'] += reasoning_content message = llm_entities.MessageChunk(**delta) @@ -135,11 +116,33 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): current_content = '' args['stream'] = True chunk_idx = 0 - self.is_content = False + is_think = False tool_calls_map: dict[str, llm_entities.ToolCall] = {} async for chunk in self._req_stream(args, extra_body=extra_args): # 处理流式消息 - delta_message = await self._make_msg_chunk(remove_think, chunk, chunk_idx) + if hasattr(chunk, 'choices'): + # 完整响应模式 + if chunk.choices: + choice = chunk.choices[0] + delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump() + else: + continue + else: + # 流式chunk模式 + delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {} + if remove_think: + if delta['content'] is not None: + if '' in delta['content']: + is_think = True + continue + elif delta['content'] == r'': + is_think = False + continue + elif is_think or delta['content'] == '\n\n': + continue + + delta_message = await self._make_msg_chunk(delta, chunk_idx) + # 处理流式消息 if delta_message.content: current_content += delta_message.content delta_message.content = current_content @@ -164,5 +167,4 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): delta_message.is_final = True delta_message.content = current_content - if chunk_idx % 64 == 0 or delta_message.is_final: - yield delta_message + yield delta_message diff --git a/pkg/provider/runners/localagent.py b/pkg/provider/runners/localagent.py index 918ec42c..1f17fafd 100644 --- a/pkg/provider/runners/localagent.py +++ b/pkg/provider/runners/localagent.py @@ -122,10 +122,11 @@ class LocalAgentRunner(runner.RequestRunner): remove_think=remove_think, ): msg_idx = msg_idx + 1 + tool_msg = msg if msg_idx % 8 == 0 or msg.is_final: yield msg - if msg.tool_calls: - for tool_call in msg.tool_calls: + if tool_msg.tool_calls: + for tool_call in tool_msg.tool_calls: if tool_call.id not in tool_calls_map: tool_calls_map[tool_call.id] = llm_entities.ToolCall( id=tool_call.id, @@ -137,9 +138,9 @@ class LocalAgentRunner(runner.RequestRunner): if tool_call.function and tool_call.function.arguments: # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments - final_msg = llm_entities.Message( - role=msg.role, - content=msg.all_content, + final_msg = llm_entities.MessageChunk( + role="tool", + content='', tool_calls=list(tool_calls_map.values()), ) @@ -176,6 +177,7 @@ class LocalAgentRunner(runner.RequestRunner): if is_stream: tool_calls_map = {} + msg_idx = 0 async for msg in await query.use_llm_model.requester.invoke_llm_stream( query, query.use_llm_model, @@ -184,9 +186,12 @@ class LocalAgentRunner(runner.RequestRunner): extra_args=query.use_llm_model.model_entity.extra_args, remove_think=remove_think, ): - yield msg - if msg.tool_calls: - for tool_call in msg.tool_calls: + msg_idx += 1 + tool_msg = msg + if msg_idx % 8 == 0 or msg.is_final: + yield msg + if tool_msg.tool_calls: + for tool_call in tool_msg.tool_calls: if tool_call.id not in tool_calls_map: tool_calls_map[tool_call.id] = llm_entities.ToolCall( id=tool_call.id, @@ -198,9 +203,9 @@ class LocalAgentRunner(runner.RequestRunner): if tool_call.function and tool_call.function.arguments: # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments - final_msg = llm_entities.Message( - role=msg.role, - content=msg.all_content, + final_msg = llm_entities.MessageChunk( + role="tool", + content='', tool_calls=list(tool_calls_map.values()), ) else: