Compare commits

..

1 Commits

Author SHA1 Message Date
RockChinQ
e43b76eba4 feat: parallel execution of tool calls in LocalAgentRunner
Use asyncio.gather() to execute independent tool calls concurrently
instead of sequentially. LLM returns multiple tool_calls in a single
response when they are independent, so parallel execution is safe
and significantly reduces latency.

Closes #2050
2026-03-12 03:16:20 -04:00
2 changed files with 20 additions and 60 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import copy import copy
import asyncio
import typing import typing
from .. import runner from .. import runner
from ..modelmgr import requester as modelmgr_requester from ..modelmgr import requester as modelmgr_requester
@@ -132,12 +133,6 @@ class LocalAgentRunner(runner.RequestRunner):
"""Run request""" """Run request"""
pending_tool_calls = [] pending_tool_calls = []
# Agent loop protection config
agent_config = query.pipeline_config['ai']['local-agent']
max_tool_iterations = agent_config.get('max-tool-iterations', 16)
max_tool_result_chars = agent_config.get('max-tool-result-chars', 8000)
iteration_count = 0
# Get knowledge bases list (new field) # Get knowledge bases list (new field)
kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', []) kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', [])
@@ -301,44 +296,37 @@ class LocalAgentRunner(runner.RequestRunner):
# Once a model succeeds, commit to it for the tool call loop # Once a model succeeds, commit to it for the tool call loop
# (no fallback mid-conversation — different models may interpret tool results differently) # (no fallback mid-conversation — different models may interpret tool results differently)
while pending_tool_calls: while pending_tool_calls:
iteration_count += 1 # Execute all tool calls in parallel (they are independent within the same batch)
if iteration_count > max_tool_iterations: async def _execute_single_tool(tc):
self.ap.logger.warning( """Execute a single tool call and return (tool_call, content, error)."""
f'localagent: query={query.query_id} agent loop exceeded max iterations ({max_tool_iterations}), '
f'forcing termination'
)
break
for tool_call in pending_tool_calls:
try: try:
func = tool_call.function func = tc.function
parameters = json.loads(func.arguments) if func.arguments else {}
if func.arguments:
parameters = json.loads(func.arguments)
else:
parameters = {}
func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query) func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query)
# Handle return value content # Handle return value content
tool_content = None
if ( if (
isinstance(func_ret, list) isinstance(func_ret, list)
and len(func_ret) > 0 and len(func_ret) > 0
and isinstance(func_ret[0], provider_message.ContentElement) and isinstance(func_ret[0], provider_message.ContentElement)
): ):
tool_content = func_ret return tc, func_ret, None
else: else:
tool_content = json.dumps(func_ret, ensure_ascii=False) return tc, json.dumps(func_ret, ensure_ascii=False), None
except Exception as e:
return tc, None, e
# Truncate oversized tool results to prevent context overflow tool_results = await asyncio.gather(*[_execute_single_tool(tc) for tc in pending_tool_calls])
if isinstance(tool_content, str) and len(tool_content) > max_tool_result_chars:
self.ap.logger.warning(
f'localagent: tool {func.name} returned {len(tool_content)} chars, '
f'truncating to {max_tool_result_chars}'
)
tool_content = tool_content[:max_tool_result_chars] + '\n...[result truncated]'
# Yield results in order and append to messages
for tool_call, tool_content, tool_error in tool_results:
if tool_error is not None:
err_msg = provider_message.Message(
role='tool', content=f'err: {tool_error}', tool_call_id=tool_call.id
)
yield err_msg
req_messages.append(err_msg)
else:
if is_stream: if is_stream:
msg = provider_message.MessageChunk( msg = provider_message.MessageChunk(
role='tool', role='tool',
@@ -351,16 +339,8 @@ class LocalAgentRunner(runner.RequestRunner):
content=tool_content, content=tool_content,
tool_call_id=tool_call.id, tool_call_id=tool_call.id,
) )
yield msg yield msg
req_messages.append(msg) req_messages.append(msg)
except Exception as e:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
yield err_msg
req_messages.append(err_msg)
self.ap.logger.debug( self.ap.logger.debug(
f'localagent req: query={query.query_id} req_messages={req_messages} ' f'localagent req: query={query.query_id} req_messages={req_messages} '

View File

@@ -93,26 +93,6 @@ stages:
type: knowledge-base-multi-selector type: knowledge-base-multi-selector
required: false required: false
default: [] default: []
- name: max-tool-iterations
label:
en_US: Max Tool Iterations
zh_Hans: 最大工具调用轮次
description:
en_US: Maximum number of tool call iterations in a single agent loop to prevent runaway loops
zh_Hans: 单次 Agent 循环中工具调用的最大轮次,防止无限循环
type: integer
required: false
default: 16
- name: max-tool-result-chars
label:
en_US: Max Tool Result Length
zh_Hans: 工具返回最大字符数
description:
en_US: Maximum character length of a single tool call result, longer results will be truncated
zh_Hans: 单次工具调用返回结果的最大字符数,超出部分将被截断
type: integer
required: false
default: 8000
- name: tbox-app-api - name: tbox-app-api
label: label:
en_US: Tbox App API en_US: Tbox App API