Compare commits

...

1 Commits

Author SHA1 Message Date
RockChinQ
e43b76eba4 feat: parallel execution of tool calls in LocalAgentRunner
Use asyncio.gather() to execute independent tool calls concurrently
instead of sequentially. LLM returns multiple tool_calls in a single
response when they are independent, so parallel execution is safe
and significantly reduces latency.

Closes #2050
2026-03-12 03:16:20 -04:00

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import copy import copy
import asyncio
import typing import typing
from .. import runner from .. import runner
from ..modelmgr import requester as modelmgr_requester from ..modelmgr import requester as modelmgr_requester
@@ -295,28 +296,37 @@ class LocalAgentRunner(runner.RequestRunner):
# Once a model succeeds, commit to it for the tool call loop # Once a model succeeds, commit to it for the tool call loop
# (no fallback mid-conversation — different models may interpret tool results differently) # (no fallback mid-conversation — different models may interpret tool results differently)
while pending_tool_calls: while pending_tool_calls:
for tool_call in pending_tool_calls: # Execute all tool calls in parallel (they are independent within the same batch)
async def _execute_single_tool(tc):
"""Execute a single tool call and return (tool_call, content, error)."""
try: try:
func = tool_call.function func = tc.function
parameters = json.loads(func.arguments) if func.arguments else {}
if func.arguments:
parameters = json.loads(func.arguments)
else:
parameters = {}
func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query) func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query)
# Handle return value content # Handle return value content
tool_content = None
if ( if (
isinstance(func_ret, list) isinstance(func_ret, list)
and len(func_ret) > 0 and len(func_ret) > 0
and isinstance(func_ret[0], provider_message.ContentElement) and isinstance(func_ret[0], provider_message.ContentElement)
): ):
tool_content = func_ret return tc, func_ret, None
else: else:
tool_content = json.dumps(func_ret, ensure_ascii=False) return tc, json.dumps(func_ret, ensure_ascii=False), None
except Exception as e:
return tc, None, e
tool_results = await asyncio.gather(*[_execute_single_tool(tc) for tc in pending_tool_calls])
# Yield results in order and append to messages
for tool_call, tool_content, tool_error in tool_results:
if tool_error is not None:
err_msg = provider_message.Message(
role='tool', content=f'err: {tool_error}', tool_call_id=tool_call.id
)
yield err_msg
req_messages.append(err_msg)
else:
if is_stream: if is_stream:
msg = provider_message.MessageChunk( msg = provider_message.MessageChunk(
role='tool', role='tool',
@@ -329,16 +339,8 @@ class LocalAgentRunner(runner.RequestRunner):
content=tool_content, content=tool_content,
tool_call_id=tool_call.id, tool_call_id=tool_call.id,
) )
yield msg yield msg
req_messages.append(msg) req_messages.append(msg)
except Exception as e:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
yield err_msg
req_messages.append(err_msg)
self.ap.logger.debug( self.ap.logger.debug(
f'localagent req: query={query.query_id} req_messages={req_messages} ' f'localagent req: query={query.query_id} req_messages={req_messages} '