Compare commits

..

1 Commits

Author SHA1 Message Date
RockChinQ
e43b76eba4 feat: parallel execution of tool calls in LocalAgentRunner
Use asyncio.gather() to execute independent tool calls concurrently
instead of sequentially. LLM returns multiple tool_calls in a single
response when they are independent, so parallel execution is safe
and significantly reduces latency.

Closes #2050
2026-03-12 03:16:20 -04:00
2 changed files with 20 additions and 60 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json
import copy
import asyncio
import typing
from .. import runner
from ..modelmgr import requester as modelmgr_requester
@@ -132,12 +133,6 @@ class LocalAgentRunner(runner.RequestRunner):
"""Run request"""
pending_tool_calls = []
# Agent loop protection config
agent_config = query.pipeline_config['ai']['local-agent']
max_tool_iterations = agent_config.get('max-tool-iterations', 16)
max_tool_result_chars = agent_config.get('max-tool-result-chars', 8000)
iteration_count = 0
# Get knowledge bases list (new field)
kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', [])
@@ -301,44 +296,37 @@ class LocalAgentRunner(runner.RequestRunner):
# Once a model succeeds, commit to it for the tool call loop
# (no fallback mid-conversation — different models may interpret tool results differently)
while pending_tool_calls:
iteration_count += 1
if iteration_count > max_tool_iterations:
self.ap.logger.warning(
f'localagent: query={query.query_id} agent loop exceeded max iterations ({max_tool_iterations}), '
f'forcing termination'
)
break
for tool_call in pending_tool_calls:
# Execute all tool calls in parallel (they are independent within the same batch)
async def _execute_single_tool(tc):
"""Execute a single tool call and return (tool_call, content, error)."""
try:
func = tool_call.function
if func.arguments:
parameters = json.loads(func.arguments)
else:
parameters = {}
func = tc.function
parameters = json.loads(func.arguments) if func.arguments else {}
func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query)
# Handle return value content
tool_content = None
if (
isinstance(func_ret, list)
and len(func_ret) > 0
and isinstance(func_ret[0], provider_message.ContentElement)
):
tool_content = func_ret
return tc, func_ret, None
else:
tool_content = json.dumps(func_ret, ensure_ascii=False)
return tc, json.dumps(func_ret, ensure_ascii=False), None
except Exception as e:
return tc, None, e
# Truncate oversized tool results to prevent context overflow
if isinstance(tool_content, str) and len(tool_content) > max_tool_result_chars:
self.ap.logger.warning(
f'localagent: tool {func.name} returned {len(tool_content)} chars, '
f'truncating to {max_tool_result_chars}'
)
tool_content = tool_content[:max_tool_result_chars] + '\n...[result truncated]'
tool_results = await asyncio.gather(*[_execute_single_tool(tc) for tc in pending_tool_calls])
# Yield results in order and append to messages
for tool_call, tool_content, tool_error in tool_results:
if tool_error is not None:
err_msg = provider_message.Message(
role='tool', content=f'err: {tool_error}', tool_call_id=tool_call.id
)
yield err_msg
req_messages.append(err_msg)
else:
if is_stream:
msg = provider_message.MessageChunk(
role='tool',
@@ -351,16 +339,8 @@ class LocalAgentRunner(runner.RequestRunner):
content=tool_content,
tool_call_id=tool_call.id,
)
yield msg
req_messages.append(msg)
except Exception as e:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
yield err_msg
req_messages.append(err_msg)
self.ap.logger.debug(
f'localagent req: query={query.query_id} req_messages={req_messages} '

View File

@@ -93,26 +93,6 @@ stages:
type: knowledge-base-multi-selector
required: false
default: []
- name: max-tool-iterations
label:
en_US: Max Tool Iterations
zh_Hans: 最大工具调用轮次
description:
en_US: Maximum number of tool call iterations in a single agent loop to prevent runaway loops
zh_Hans: 单次 Agent 循环中工具调用的最大轮次,防止无限循环
type: integer
required: false
default: 16
- name: max-tool-result-chars
label:
en_US: Max Tool Result Length
zh_Hans: 工具返回最大字符数
description:
en_US: Maximum character length of a single tool call result, longer results will be truncated
zh_Hans: 单次工具调用返回结果的最大字符数,超出部分将被截断
type: integer
required: false
default: 8000
- name: tbox-app-api
label:
en_US: Tbox App API