diff --git a/pkg/pipeline/respback/respback.py b/pkg/pipeline/respback/respback.py
index c7824856..bc91dffe 100644
--- a/pkg/pipeline/respback/respback.py
+++ b/pkg/pipeline/respback/respback.py
@@ -38,6 +38,7 @@ class SendResponseBackStage(stage.PipelineStage):
quote_origin = query.pipeline_config['output']['misc']['quote-origin']
# has_chunks = any(isinstance(msg, llm_entities.MessageChunk) for msg in query.resp_messages)
+ # TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported():
is_final = [msg.is_final for msg in query.resp_messages][0]
await query.adapter.reply_message_chunk(
diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py
index dfe2ed71..f8ea8593 100644
--- a/pkg/provider/modelmgr/requesters/chatcmpl.py
+++ b/pkg/provider/modelmgr/requesters/chatcmpl.py
@@ -42,7 +42,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
self,
args: dict,
extra_body: dict = {},
- ) -> chat_completion.ChatCompletion:
+ ):
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
yield chunk
@@ -52,60 +52,73 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
remove_think: bool = False,
) -> llm_entities.Message:
chatcmpl_message = chat_completion.choices[0].message.model_dump()
- # print(chatcmpl_message.keys(),chatcmpl_message.values())
+
# 确保 role 字段存在且不为 None
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
chatcmpl_message['role'] = 'assistant'
- reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None
+ # 处理思维链
+ content = chatcmpl_message.get('content', '')
+ reasoning_content = chatcmpl_message.get('reasoning_content', None)
- # deepseek的reasoner模型
- if remove_think:
- pass
- else:
- if reasoning_content is not None:
- chatcmpl_message['content'] = (
- '\n' + reasoning_content + '\n\n' + chatcmpl_message['content']
- )
+ processed_content, _ = await self._process_thinking_content(
+ content=content, reasoning_content=reasoning_content, remove_think=remove_think
+ )
+
+ chatcmpl_message['content'] = processed_content
+
+ # 移除 reasoning_content 字段,避免传递给 Message
+ if 'reasoning_content' in chatcmpl_message:
+ del chatcmpl_message['reasoning_content']
message = llm_entities.Message(**chatcmpl_message)
-
return message
- async def _make_msg_chunk(
+ async def _process_thinking_content(
self,
- delta: dict[str, typing.Any],
- idx: int,
- is_content: bool,
- is_think: bool,
- ) -> llm_entities.MessageChunk:
- # 处理流式chunk和完整响应的差异
- # print(chat_completion.choices[0])
+ content: str,
+ reasoning_content: str = None,
+ remove_think: bool = False,
+ ) -> tuple[str, str]:
+ """处理思维链内容
- if 'role' not in delta or delta['role'] is None:
- delta['role'] = 'assistant'
+ Args:
+ content: 原始内容
+ reasoning_content: reasoning_content 字段内容
+ remove_think: 是否移除思维链
- reasoning_content = delta['reasoning_content']
+ Returns:
+ (处理后的内容, 提取的思维链内容)
+ """
+ thinking_content = ''
- delta['content'] = '' if delta['content'] is None else delta['content']
+ # 1. 从 reasoning_content 提取思维链
+ if reasoning_content:
+ thinking_content = reasoning_content
- # deepseek的reasoner模型
- if reasoning_content is not None and idx == 0:
- delta['content'] += f'\n{reasoning_content}'
- is_think = True
- elif reasoning_content is None and idx != 0:
- if is_content:
- delta['content'] = delta['content']
- elif is_think:
- delta['content'] = f'\n\n\n{delta["content"]}'
- is_content = True
- is_think = False
- elif reasoning_content is not None and reasoning_content != '':
- delta['content'] = reasoning_content
+ # 2. 从 content 中提取 标签内容
+ if content and '' in content and '' in content:
+ import re
- message = llm_entities.MessageChunk(**delta)
+ think_pattern = r'(.*?)'
+ think_matches = re.findall(think_pattern, content, re.DOTALL)
+ if think_matches:
+ # 如果已有 reasoning_content,则追加
+ if thinking_content:
+ thinking_content += '\n' + '\n'.join(think_matches)
+ else:
+ thinking_content = '\n'.join(think_matches)
+ # 移除 content 中的 标签
+ content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
- return message,is_content, is_think
+ # 3. 根据 remove_think 参数决定是否保留思维链
+ if remove_think:
+ return content, ''
+ else:
+ # 如果有思维链内容,将其以 格式添加到 content 开头
+ if thinking_content:
+ content = f'\n{thinking_content}\n\n{content}'.strip()
+ return content, thinking_content
async def _closure_stream(
self,
@@ -123,7 +136,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
-
if tools:
args['tools'] = tools
@@ -140,62 +152,105 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
del me['image_base64']
args['messages'] = messages
-
- current_content = ''
args['stream'] = True
- chunk_idx = 0
- is_content = False
- is_think = False
+
+ # 流式处理状态
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
+ chunk_idx = 0
+ thinking_started = False
+ thinking_ended = False
+ role = 'assistant' # 默认角色
+ accumulated_reasoning = '' # 仅用于判断何时结束思维链
+
async for chunk in self._req_stream(args, extra_body=extra_args):
- if hasattr(chunk, 'choices'):
- # 完整响应模式
+ # 解析 chunk 数据
+ if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
- delta = choice.delta.model_dump() if hasattr(choice, 'delta') else choice.message.model_dump()
+ delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
+ finish_reason = getattr(choice, 'finish_reason', None)
else:
- # 流式chunk模式
- delta = chunk.delta.model_dump() if hasattr(chunk, 'delta') else {}
- print(delta)
- reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
- delta['reasoning_content'] = reasoning_content
- if remove_think:
- if delta['reasoning_content'] is not None:
+ delta = {}
+ finish_reason = None
+
+ # 从第一个 chunk 获取 role,后续使用这个 role
+ if 'role' in delta and delta['role']:
+ role = delta['role']
+
+ # 获取增量内容
+ delta_content = delta.get('content', '')
+ reasoning_content = delta.get('reasoning_content', '')
+
+ # 处理 reasoning_content
+ if reasoning_content:
+ accumulated_reasoning += reasoning_content
+ # 如果设置了 remove_think,跳过 reasoning_content
+ if remove_think:
+ chunk_idx += 1
continue
- if ((delta['content'] == '' or delta.get('content',None) is None) and
- (delta.get('reasoning_content',None) is None or delta['reasoning_content'] == '') and
- chunk_idx == 0): # 此处将第一条空消息排除,大部分模型第一条消息携带的是role,但是在role直接处理为ass
+
+ # 第一次出现 reasoning_content,添加 开始标签
+ if not thinking_started:
+ thinking_started = True
+ delta_content = '\n' + reasoning_content
+ else:
+ # 继续输出 reasoning_content
+ delta_content = reasoning_content
+ elif thinking_started and not thinking_ended and delta_content:
+ # reasoning_content 结束,normal content 开始,添加 结束标签
+ thinking_ended = True
+ delta_content = '\n\n' + delta_content
+
+ # 处理 content 中已有的 标签(如果需要移除)
+ if delta_content and remove_think and '' in delta_content:
+ import re
+
+ # 移除 标签及其内容
+ delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL)
+
+ # 处理工具调用增量
+ delta_tool_calls = None
+ if delta.get('tool_calls'):
+ delta_tool_calls = []
+ for tool_call in delta['tool_calls']:
+ tc_id = tool_call.get('id')
+ if tc_id:
+ if tc_id not in tool_calls_map:
+ # 新的工具调用
+ tool_calls_map[tc_id] = llm_entities.ToolCall(
+ id=tc_id,
+ type=tool_call.get('type', 'function'),
+ function=llm_entities.FunctionCall(
+ name=tool_call.get('function', {}).get('name', ''),
+ arguments=tool_call.get('function', {}).get('arguments', ''),
+ ),
+ )
+ delta_tool_calls.append(tool_calls_map[tc_id])
+ else:
+ # 追加函数参数
+ func_args = tool_call.get('function', {}).get('arguments', '')
+ if func_args:
+ tool_calls_map[tc_id].function.arguments += func_args
+ # 返回更新后的完整工具调用
+ delta_tool_calls.append(tool_calls_map[tc_id])
+
+ # 跳过空的第一个 chunk(只有 role 没有内容)
+ if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
+ chunk_idx += 1
continue
- # 处理流式消息
- delta_message,is_content,is_think = await self._make_msg_chunk(delta,
- chunk_idx,
- is_content,
- is_think)
- if delta_message.content:
- current_content += delta_message.content
- delta_message.content = current_content
- # delta_message.all_content = current_content
- if delta_message.tool_calls:
- for tool_call in delta_message.tool_calls:
- if tool_call.id not in tool_calls_map:
- tool_calls_map[tool_call.id] = llm_entities.ToolCall(
- id=tool_call.id,
- type=tool_call.type,
- function=llm_entities.FunctionCall(
- name=tool_call.function.name if tool_call.function else '', arguments=''
- ),
- )
- if tool_call.function and tool_call.function.arguments:
- # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖
- tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
+ # 构建 MessageChunk - 只包含增量内容
+ chunk_data = {
+ 'role': role,
+ 'content': delta_content if delta_content else None,
+ 'tool_calls': delta_tool_calls if delta_tool_calls else None,
+ 'is_final': bool(finish_reason),
+ }
+
+ # 移除 None 值
+ chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
+
+ yield llm_entities.MessageChunk(**chunk_data)
chunk_idx += 1
- chunk_choices = getattr(chunk, 'choices', None)
- if chunk_choices and getattr(chunk_choices[0], 'finish_reason', None):
- delta_message.is_final = True
- delta_message.content = current_content
-
- yield delta_message
- # return
async def _closure(
self,
diff --git a/pkg/provider/runners/localagent.py b/pkg/provider/runners/localagent.py
index 1f17fafd..754082ea 100644
--- a/pkg/provider/runners/localagent.py
+++ b/pkg/provider/runners/localagent.py
@@ -113,6 +113,9 @@ class LocalAgentRunner(runner.RequestRunner):
# 流式输出,需要处理工具调用
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
msg_idx = 0
+ accumulated_content = '' # 从开始累积的所有内容
+ last_role = 'assistant'
+
async for msg in query.use_llm_model.requester.invoke_llm_stream(
query,
query.use_llm_model,
@@ -122,11 +125,18 @@ class LocalAgentRunner(runner.RequestRunner):
remove_think=remove_think,
):
msg_idx = msg_idx + 1
- tool_msg = msg
- if msg_idx % 8 == 0 or msg.is_final:
- yield msg
- if tool_msg.tool_calls:
- for tool_call in tool_msg.tool_calls:
+
+ # 记录角色
+ if msg.role:
+ last_role = msg.role
+
+ # 累积内容
+ if msg.content:
+ accumulated_content += msg.content
+
+ # 处理工具调用
+ if msg.tool_calls:
+ for tool_call in msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
@@ -138,10 +148,21 @@ class LocalAgentRunner(runner.RequestRunner):
if tool_call.function and tool_call.function.arguments:
# 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
+
+ # 每8个chunk或最后一个chunk时,输出所有累积的内容
+ if msg_idx % 8 == 0 or msg.is_final:
+ yield llm_entities.MessageChunk(
+ role=last_role,
+ content=accumulated_content, # 输出所有累积内容
+ tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None,
+ is_final=msg.is_final,
+ )
+
+ # 创建最终消息用于后续处理
final_msg = llm_entities.MessageChunk(
- role="tool",
- content='',
- tool_calls=list(tool_calls_map.values()),
+ role=last_role,
+ content=accumulated_content,
+ tool_calls=list(tool_calls_map.values()) if tool_calls_map else None,
)
pending_tool_calls = final_msg.tool_calls
@@ -178,7 +199,10 @@ class LocalAgentRunner(runner.RequestRunner):
if is_stream:
tool_calls_map = {}
msg_idx = 0
- async for msg in await query.use_llm_model.requester.invoke_llm_stream(
+ accumulated_content = '' # 从开始累积的所有内容
+ last_role = 'assistant'
+
+ async for msg in query.use_llm_model.requester.invoke_llm_stream(
query,
query.use_llm_model,
req_messages,
@@ -187,11 +211,18 @@ class LocalAgentRunner(runner.RequestRunner):
remove_think=remove_think,
):
msg_idx += 1
- tool_msg = msg
- if msg_idx % 8 == 0 or msg.is_final:
- yield msg
- if tool_msg.tool_calls:
- for tool_call in tool_msg.tool_calls:
+
+ # 记录角色
+ if msg.role:
+ last_role = msg.role
+
+ # 累积内容
+ if msg.content:
+ accumulated_content += msg.content
+
+ # 处理工具调用
+ if msg.tool_calls:
+ for tool_call in msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
@@ -203,10 +234,20 @@ class LocalAgentRunner(runner.RequestRunner):
if tool_call.function and tool_call.function.arguments:
# 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
+
+ # 每8个chunk或最后一个chunk时,输出所有累积的内容
+ if msg_idx % 8 == 0 or msg.is_final:
+ yield llm_entities.MessageChunk(
+ role=last_role,
+ content=accumulated_content, # 输出所有累积内容
+ tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None,
+ is_final=msg.is_final,
+ )
+
final_msg = llm_entities.MessageChunk(
- role="tool",
- content='',
- tool_calls=list(tool_calls_map.values()),
+ role=last_role,
+ content=accumulated_content,
+ tool_calls=list(tool_calls_map.values()) if tool_calls_map else None,
)
else:
# 处理完所有调用,再次请求