feat: 实现流式消息处理支持

This commit is contained in:
fdc
2025-06-30 17:58:18 +08:00
committed by Junyan Qin
parent ba4b5255a2
commit b65670cd1a
9 changed files with 385 additions and 46 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json
import copy
from ssl import ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE
import typing
from .. import runner
from ...core import entities as core_entities
@@ -27,7 +28,13 @@ Respond in the same language as the user's input.
class LocalAgentRunner(runner.RequestRunner):
"""本地Agent请求运行器"""
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
class ToolCallTracker:
"""工具调用追踪器"""
def __init__(self):
self.active_calls: dict[str,dict] = {}
self.completed_calls: list[llm_entities.ToolCall] = []
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]:
"""运行请求"""
pending_tool_calls = []
@@ -80,20 +87,57 @@ class LocalAgentRunner(runner.RequestRunner):
req_messages = query.prompt.messages.copy() + query.messages.copy() + [user_message]
# 首次请求
msg = await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
extra_args=query.use_llm_model.model_entity.extra_args,
)
is_stream = query.adapter.is_stream_output_supported()
# while True:
# pass
if not is_stream:
# 非流式输出,直接请求
msg = await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
extra_args=query.use_llm_model.model_entity.extra_args,
)
yield msg
final_msg = msg
else:
# 流式输出,需要处理工具调用
tool_calls_map: dict[str, llm_entities.ToolCall] = {}
async for msg in await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
stream=is_stream,
extra_args=query.use_llm_model.model_entity.extra_args,
):
assert isinstance(msg, llm_entities.MessageChunk)
yield msg
if msg.tool_calls:
for tool_call in msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
type=tool_call.type,
function=llm_entities.FunctionCall(
name=tool_call.function.name if tool_call.function else '',
arguments=''
),
)
if tool_call.function and tool_call.function.arguments:
# 流式处理中工具调用参数可能分多个chunk返回需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
final_msg = llm_entities.Message(
role=msg.role,
content=msg.all_content,
tool_calls=list(tool_calls_map.values()),
)
yield msg
pending_tool_calls = final_msg.tool_calls
pending_tool_calls = msg.tool_calls
req_messages.append(msg)
req_messages.append(final_msg)
# 持续请求,只要还有待处理的工具调用就继续处理调用
while pending_tool_calls:
@@ -122,17 +166,50 @@ class LocalAgentRunner(runner.RequestRunner):
req_messages.append(err_msg)
# 处理完所有调用,再次请求
msg = await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
extra_args=query.use_llm_model.model_entity.extra_args,
)
if is_stream:
tool_calls_map = {}
async for msg in await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
stream=is_stream,
extra_args=query.use_llm_model.model_entity.extra_args,
):
assert isinstance(msg, llm_entities.MessageChunk)
yield msg
if msg.tool_calls:
for tool_call in msg.tool_calls:
if tool_call.id not in tool_calls_map:
tool_calls_map[tool_call.id] = llm_entities.ToolCall(
id=tool_call.id,
type=tool_call.type,
function=llm_entities.FunctionCall(
name=tool_call.function.name if tool_call.function else '',
arguments=''
),
)
if tool_call.function and tool_call.function.arguments:
# 流式处理中工具调用参数可能分多个chunk返回需要追加而不是覆盖
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
final_msg = llm_entities.Message(
role=msg.role,
content=msg.all_content,
tool_calls=list(tool_calls_map.values()),
)
else:
# 处理完所有调用,再次请求
msg = await query.use_llm_model.requester.invoke_llm(
query,
query.use_llm_model,
req_messages,
query.use_funcs,
extra_args=query.use_llm_model.model_entity.extra_args,
)
yield msg
yield msg
final_msg = msg
pending_tool_calls = msg.tool_calls
pending_tool_calls = final_msg.tool_calls
req_messages.append(msg)
req_messages.append(final_msg)