feat(box): add sandbox_exec tool loop for local-agent calculations

This commit is contained in:
youhuanghe
2026-03-19 12:28:10 +00:00
committed by WangCham
parent 3b3deec080
commit ba7a45713d
17 changed files with 952 additions and 10 deletions

View File

@@ -0,0 +1 @@
"""LangBot Box runtime package."""

View File

@@ -0,0 +1,207 @@
from __future__ import annotations
import abc
import asyncio
import dataclasses
import datetime as dt
import logging
import re
import shlex
import shutil
import uuid
from .errors import BoxError
from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec
@dataclasses.dataclass(slots=True)
class _CommandResult:
return_code: int
stdout: str
stderr: str
timed_out: bool = False
class BaseSandboxBackend(abc.ABC):
name: str
def __init__(self, logger: logging.Logger):
self.logger = logger
async def initialize(self):
return None
@abc.abstractmethod
async def is_available(self) -> bool:
pass
@abc.abstractmethod
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
pass
@abc.abstractmethod
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
pass
@abc.abstractmethod
async def stop_session(self, session: BoxSessionInfo):
pass
class CLISandboxBackend(BaseSandboxBackend):
command: str
def __init__(self, logger: logging.Logger, command: str, backend_name: str):
super().__init__(logger)
self.command = command
self.name = backend_name
async def is_available(self) -> bool:
if shutil.which(self.command) is None:
return False
result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False)
return result.return_code == 0 and not result.timed_out
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
now = dt.datetime.now(dt.UTC)
container_name = self._build_container_name(spec.session_id)
args = [
self.command,
'run',
'-d',
'--rm',
'--name',
container_name,
'--label',
'langbot.box=true',
'--label',
f'langbot.session_id={spec.session_id}',
]
if spec.network.value == 'off':
args.extend(['--network', 'none'])
args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done'])
await self._run_command(args, timeout_sec=30, check=True)
return BoxSessionInfo(
session_id=spec.session_id,
backend_name=self.name,
backend_session_id=container_name,
image=spec.image,
network=spec.network,
created_at=now,
last_used_at=now,
)
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
start = dt.datetime.now(dt.UTC)
args = [self.command, 'exec']
for key, value in spec.env.items():
args.extend(['-e', f'{key}={value}'])
args.extend(
[
session.backend_session_id,
'sh',
'-lc',
self._build_exec_command(spec.workdir, spec.cmd),
]
)
result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False)
duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000)
if result.timed_out:
return BoxExecutionResult(
session_id=session.session_id,
backend_name=self.name,
status=BoxExecutionStatus.TIMED_OUT,
exit_code=None,
stdout=result.stdout,
stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.',
duration_ms=duration_ms,
)
return BoxExecutionResult(
session_id=session.session_id,
backend_name=self.name,
status=BoxExecutionStatus.COMPLETED,
exit_code=result.return_code,
stdout=result.stdout,
stderr=result.stderr,
duration_ms=duration_ms,
)
async def stop_session(self, session: BoxSessionInfo):
await self._run_command(
[self.command, 'rm', '-f', session.backend_session_id],
timeout_sec=20,
check=False,
)
def _build_container_name(self, session_id: str) -> str:
normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session'
suffix = uuid.uuid4().hex[:8]
return f'langbot-box-{normalized[:32]}-{suffix}'
def _build_exec_command(self, workdir: str, cmd: str) -> str:
quoted_workdir = shlex.quote(workdir)
return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}'
async def _run_command(
self,
args: list[str],
timeout_sec: int,
check: bool,
) -> _CommandResult:
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=timeout_sec)
except asyncio.TimeoutError:
process.kill()
stdout_bytes, stderr_bytes = await process.communicate()
return _CommandResult(
return_code=-1,
stdout=stdout_bytes.decode('utf-8', errors='replace').strip(),
stderr=stderr_bytes.decode('utf-8', errors='replace').strip(),
timed_out=True,
)
stdout = stdout_bytes.decode('utf-8', errors='replace').strip()
stderr = stderr_bytes.decode('utf-8', errors='replace').strip()
if check and process.returncode != 0:
raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error'))
return _CommandResult(
return_code=process.returncode,
stdout=stdout,
stderr=stderr,
timed_out=False,
)
def _format_cli_error(self, message: str) -> str:
message = ' '.join(message.split())
if len(message) > 300:
message = f'{message[:297]}...'
return f'{self.name} backend error: {message}'
class PodmanBackend(CLISandboxBackend):
def __init__(self, logger: logging.Logger):
super().__init__(logger=logger, command='podman', backend_name='podman')
class DockerBackend(CLISandboxBackend):
def __init__(self, logger: logging.Logger):
super().__init__(logger=logger, command='docker', backend_name='docker')

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
class BoxError(RuntimeError):
"""Base error for LangBot Box failures."""
class BoxValidationError(BoxError):
"""Raised when sandbox_exec arguments are invalid."""
class BoxBackendUnavailableError(BoxError):
"""Raised when no supported container backend is available."""
class BoxSessionConflictError(BoxError):
"""Raised when an existing session cannot satisfy a new request."""

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
import datetime as dt
import enum
import pydantic
DEFAULT_BOX_IMAGE = 'python:3.11-slim'
class BoxNetworkMode(str, enum.Enum):
OFF = 'off'
ON = 'on'
class BoxExecutionStatus(str, enum.Enum):
COMPLETED = 'completed'
TIMED_OUT = 'timed_out'
class BoxSpec(pydantic.BaseModel):
cmd: str
workdir: str = '/workspace'
timeout_sec: int = 30
network: BoxNetworkMode = BoxNetworkMode.OFF
session_id: str
env: dict[str, str] = pydantic.Field(default_factory=dict)
image: str = DEFAULT_BOX_IMAGE
@pydantic.field_validator('cmd')
@classmethod
def validate_cmd(cls, value: str) -> str:
value = value.strip()
if not value:
raise ValueError('cmd must not be empty')
return value
@pydantic.field_validator('workdir')
@classmethod
def validate_workdir(cls, value: str) -> str:
value = value.strip()
if not value.startswith('/'):
raise ValueError('workdir must be an absolute path inside the sandbox')
return value
@pydantic.field_validator('timeout_sec')
@classmethod
def validate_timeout_sec(cls, value: int) -> int:
if value <= 0:
raise ValueError('timeout_sec must be greater than 0')
return value
@pydantic.field_validator('session_id')
@classmethod
def validate_session_id(cls, value: str) -> str:
value = value.strip()
if not value:
raise ValueError('session_id must not be empty')
return value
@pydantic.field_validator('env')
@classmethod
def validate_env(cls, value: dict[str, str]) -> dict[str, str]:
return {str(k): str(v) for k, v in value.items()}
class BoxSessionInfo(pydantic.BaseModel):
session_id: str
backend_name: str
backend_session_id: str
image: str
network: BoxNetworkMode
created_at: dt.datetime
last_used_at: dt.datetime
class BoxExecutionResult(pydantic.BaseModel):
session_id: str
backend_name: str
status: BoxExecutionStatus
exit_code: int | None
stdout: str = ''
stderr: str = ''
duration_ms: int
@property
def ok(self) -> bool:
return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import asyncio
import dataclasses
import datetime as dt
import logging
from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend
from .errors import BoxBackendUnavailableError, BoxSessionConflictError
from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec
@dataclasses.dataclass(slots=True)
class _RuntimeSession:
info: BoxSessionInfo
lock: asyncio.Lock
class BoxRuntime:
def __init__(
self,
logger: logging.Logger,
backends: list[BaseSandboxBackend] | None = None,
session_ttl_sec: int = 300,
):
self.logger = logger
self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)]
self.session_ttl_sec = session_ttl_sec
self._backend: BaseSandboxBackend | None = None
self._sessions: dict[str, _RuntimeSession] = {}
self._lock = asyncio.Lock()
async def initialize(self):
self._backend = await self._select_backend()
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
session = await self._get_or_create_session(spec)
async with session.lock:
result = await (await self._get_backend()).exec(session.info, spec)
async with self._lock:
now = dt.datetime.now(dt.UTC)
if spec.session_id in self._sessions:
self._sessions[spec.session_id].info.last_used_at = now
if result.status == BoxExecutionStatus.TIMED_OUT:
await self._drop_session_locked(spec.session_id)
return result
async def shutdown(self):
async with self._lock:
session_ids = list(self._sessions.keys())
for session_id in session_ids:
await self._drop_session_locked(session_id)
async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession:
async with self._lock:
await self._reap_expired_sessions_locked()
existing = self._sessions.get(spec.session_id)
if existing is not None:
self._assert_session_compatible(existing.info, spec)
existing.info.last_used_at = dt.datetime.now(dt.UTC)
return existing
backend = await self._get_backend()
info = await backend.start_session(spec)
runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock())
self._sessions[spec.session_id] = runtime_session
return runtime_session
async def _get_backend(self) -> BaseSandboxBackend:
if self._backend is None:
self._backend = await self._select_backend()
if self._backend is None:
raise BoxBackendUnavailableError(
'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.'
)
return self._backend
async def _select_backend(self) -> BaseSandboxBackend | None:
for backend in self.backends:
try:
await backend.initialize()
if await backend.is_available():
self.logger.info(f'LangBot Box using backend: {backend.name}')
return backend
except Exception as exc:
self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}')
self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready')
return None
async def _reap_expired_sessions_locked(self):
if self.session_ttl_sec <= 0:
return
deadline = dt.datetime.now(dt.UTC) - dt.timedelta(seconds=self.session_ttl_sec)
expired_session_ids = [
session_id
for session_id, session in self._sessions.items()
if session.info.last_used_at < deadline
]
for session_id in expired_session_ids:
await self._drop_session_locked(session_id)
async def _drop_session_locked(self, session_id: str):
runtime_session = self._sessions.pop(session_id, None)
if runtime_session is None or self._backend is None:
return
try:
await self._backend.stop_session(runtime_session.info)
except Exception as exc:
self.logger.warning(f'Failed to clean up box session {session_id}: {exc}')
def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec):
if session.network != spec.network:
raise BoxSessionConflictError(
f'sandbox_exec session {spec.session_id} already exists with network={session.network.value}'
)
if session.image != spec.image:
raise BoxSessionConflictError(
f'sandbox_exec session {spec.session_id} already exists with image={session.image}'
)

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import pydantic
from .errors import BoxValidationError
from .models import BoxExecutionResult, BoxSpec
from .runtime import BoxRuntime
if TYPE_CHECKING:
from ..core import app as core_app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
class BoxService:
def __init__(
self,
ap: 'core_app.Application',
runtime: BoxRuntime | None = None,
output_limit_chars: int = 4000,
):
self.ap = ap
self.runtime = runtime or BoxRuntime(logger=ap.logger)
self.output_limit_chars = output_limit_chars
async def initialize(self):
await self.runtime.initialize()
async def execute_sandbox_tool(self, parameters: dict, query: 'pipeline_query.Query') -> dict:
spec_payload = dict(parameters)
spec_payload.setdefault('session_id', str(query.query_id))
spec_payload.setdefault('env', {})
try:
spec = BoxSpec.model_validate(spec_payload)
except pydantic.ValidationError as exc:
first_error = exc.errors()[0]
raise BoxValidationError(first_error.get('msg', 'invalid sandbox_exec arguments')) from exc
result = await self.runtime.execute(spec)
return self._serialize_result(result)
async def shutdown(self):
await self.runtime.shutdown()
def _serialize_result(self, result: BoxExecutionResult) -> dict:
stdout, stdout_truncated = self._truncate(result.stdout)
stderr, stderr_truncated = self._truncate(result.stderr)
return {
'session_id': result.session_id,
'backend': result.backend_name,
'status': result.status.value,
'ok': result.ok,
'exit_code': result.exit_code,
'stdout': stdout,
'stderr': stderr,
'stdout_truncated': stdout_truncated,
'stderr_truncated': stderr_truncated,
'duration_ms': result.duration_ms,
}
def _truncate(self, text: str) -> tuple[str, bool]:
if len(text) <= self.output_limit_chars:
return text, False
return f'{text[: self.output_limit_chars]}...', True

View File

@@ -9,6 +9,7 @@ from ..platform import botmgr as im_mgr
from ..platform.webhook_pusher import WebhookPusher
from ..provider.session import sessionmgr as llm_session_mgr
from ..provider.modelmgr import modelmgr as llm_model_mgr
from ..box import service as box_service_module
from langbot.pkg.provider.tools import toolmgr as llm_tool_mgr
from ..config import manager as config_mgr
@@ -69,6 +70,7 @@ class Application:
# TODO move to pipeline
tool_mgr: llm_tool_mgr.ToolManager = None
box_service: box_service_module.BoxService = None
# ======= Config manager =======

View File

@@ -8,6 +8,7 @@ from ...pipeline import pool, controller, pipelinemgr
from ...pipeline import aggregator as message_aggregator
from ...plugin import connector as plugin_connector
from ...command import cmdmgr
from ...box import service as box_service
from ...provider.session import sessionmgr as llm_session_mgr
from ...provider.modelmgr import modelmgr as llm_model_mgr
from ...provider.tools import toolmgr as llm_tool_mgr
@@ -128,6 +129,10 @@ class BuildAppStage(stage.BootingStage):
await llm_session_mgr_inst.initialize()
ap.sess_mgr = llm_session_mgr_inst
box_service_inst = box_service.BoxService(ap)
await box_service_inst.initialize()
ap.box_service = box_service_inst
llm_tool_mgr_inst = llm_tool_mgr.ToolManager(ap)
await llm_tool_mgr_inst.initialize()
ap.tool_mgr = llm_tool_mgr_inst

View File

@@ -3,7 +3,8 @@ from __future__ import annotations
import abc
import typing
from ..core import app
if typing.TYPE_CHECKING:
from ..core import app
from . import entities
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -22,9 +23,9 @@ def stage_class(name: str) -> typing.Callable[[type[PipelineStage]], type[Pipeli
class PipelineStage(metaclass=abc.ABCMeta):
"""流水线阶段"""
ap: app.Application
ap: 'app.Application'
def __init__(self, ap: app.Application):
def __init__(self, ap: 'app.Application'):
self.ap = ap
async def initialize(self, pipeline_config: dict):

View File

@@ -3,7 +3,8 @@ from __future__ import annotations
import abc
import typing
from ..core import app
if typing.TYPE_CHECKING:
from ..core import app
preregistered_runners: list[typing.Type[RequestRunner]] = []
@@ -25,11 +26,11 @@ class RequestRunner(abc.ABC):
name: str = None
ap: app.Application
ap: 'app.Application'
pipeline_config: dict
def __init__(self, ap: app.Application, pipeline_config: dict):
def __init__(self, ap: 'app.Application', pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config

View File

@@ -24,11 +24,37 @@ Respond in the same language as the user's input.
</user_message>
"""
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
SANDBOX_EXEC_SYSTEM_GUIDANCE = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec '
'and then answer from the tool result.'
)
@runner.runner_class('local-agent')
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
def _build_request_messages(
self,
query: pipeline_query.Query,
user_message: provider_message.Message,
) -> list[provider_message.Message]:
req_messages = query.prompt.messages.copy() + query.messages.copy()
if any(getattr(tool, 'name', None) == SANDBOX_EXEC_TOOL_NAME for tool in query.use_funcs or []):
req_messages.append(
provider_message.Message(
role='system',
content=SANDBOX_EXEC_SYSTEM_GUIDANCE,
)
)
req_messages.append(user_message)
return req_messages
async def _get_model_candidates(
self,
query: pipeline_query.Query,
@@ -236,7 +262,7 @@ class LocalAgentRunner(runner.RequestRunner):
ce.text = final_user_message_text
break
req_messages = query.prompt.messages.copy() + query.messages.copy() + [user_message]
req_messages = self._build_request_messages(query, user_message)
try:
is_stream = await query.adapter.is_stream_output_supported()

View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from .. import loader
class NativeToolLoader(loader.ToolLoader):
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
return [self._build_sandbox_exec_tool()]
async def has_tool(self, name: str) -> bool:
return name == self.SANDBOX_EXEC_TOOL_NAME
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query):
if name != self.SANDBOX_EXEC_TOOL_NAME:
raise ValueError(f'未找到工具: {name}')
return await self.ap.box_service.execute_sandbox_tool(parameters, query)
async def shutdown(self):
if getattr(self.ap, 'box_service', None) is not None:
await self.ap.box_service.shutdown()
def _build_sandbox_exec_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=self.SANDBOX_EXEC_TOOL_NAME,
human_desc='Execute a command inside the LangBot Box sandbox',
description=(
'Run shell commands only inside the isolated LangBot Box sandbox. '
'Use this tool for local file edits, bash commands, Python execution, and exact calculations over '
'user-provided data that must not touch the host.'
),
parameters={
'type': 'object',
'properties': {
'cmd': {
'type': 'string',
'description': 'Shell command to execute inside the sandbox.',
},
'workdir': {
'type': 'string',
'description': 'Absolute working directory path inside the sandbox. Defaults to /workspace.',
'default': '/workspace',
},
'timeout_sec': {
'type': 'integer',
'description': 'Execution timeout in seconds. Defaults to 30.',
'default': 30,
'minimum': 1,
},
'network': {
'type': 'string',
'description': 'Network policy for the sandbox session. Prefer off unless network is required.',
'enum': ['off', 'on'],
'default': 'off',
},
'session_id': {
'type': 'string',
'description': 'Optional sandbox session id. Defaults to the current request id for reuse.',
},
'env': {
'type': 'object',
'description': 'Optional environment variables to expose inside the sandbox.',
'additionalProperties': {'type': 'string'},
'default': {},
},
},
'required': ['cmd'],
'additionalProperties': False,
},
func=lambda parameters: parameters,
)

View File

@@ -5,7 +5,7 @@ import typing
from ...core import app
from langbot.pkg.utils import importutil
from langbot.pkg.provider.tools import loaders
from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, plugin as plugin_loader
from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, native as native_loader, plugin as plugin_loader
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
@@ -17,6 +17,7 @@ class ToolManager:
ap: app.Application
native_tool_loader: native_loader.NativeToolLoader
plugin_tool_loader: plugin_loader.PluginToolLoader
mcp_tool_loader: mcp_loader.MCPLoader
@@ -24,6 +25,8 @@ class ToolManager:
self.ap = ap
async def initialize(self):
self.native_tool_loader = native_loader.NativeToolLoader(self.ap)
await self.native_tool_loader.initialize()
self.plugin_tool_loader = plugin_loader.PluginToolLoader(self.ap)
await self.plugin_tool_loader.initialize()
self.mcp_tool_loader = mcp_loader.MCPLoader(self.ap)
@@ -35,6 +38,7 @@ class ToolManager:
"""获取所有函数"""
all_functions: list[resource_tool.LLMTool] = []
all_functions.extend(await self.native_tool_loader.get_tools())
all_functions.extend(await self.plugin_tool_loader.get_tools(bound_plugins))
all_functions.extend(await self.mcp_tool_loader.get_tools(bound_mcp_servers))
@@ -95,7 +99,9 @@ class ToolManager:
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
"""执行函数调用"""
if await self.plugin_tool_loader.has_tool(name):
if await self.native_tool_loader.has_tool(name):
return await self.native_tool_loader.invoke_tool(name, parameters, query)
elif await self.plugin_tool_loader.has_tool(name):
return await self.plugin_tool_loader.invoke_tool(name, parameters, query)
elif await self.mcp_tool_loader.has_tool(name):
return await self.mcp_tool_loader.invoke_tool(name, parameters, query)
@@ -104,5 +110,6 @@ class ToolManager:
async def shutdown(self):
"""关闭所有工具"""
await self.native_tool_loader.shutdown()
await self.plugin_tool_loader.shutdown()
await self.mcp_tool_loader.shutdown()

View File

@@ -49,7 +49,7 @@
"prompt": [
{
"role": "system",
"content": "You are a helpful assistant."
"content": "You are a helpful assistant. When tools are available, use them for exact calculations, data processing, and code execution instead of guessing."
}
],
"knowledge-bases": [],

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import datetime as dt
from types import SimpleNamespace
from unittest.mock import Mock
import pytest
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.pkg.box.backend import BaseSandboxBackend
from langbot.pkg.box.errors import BoxBackendUnavailableError
from langbot.pkg.box.models import BoxExecutionResult, BoxExecutionStatus, BoxNetworkMode, BoxSessionInfo, BoxSpec
from langbot.pkg.box.runtime import BoxRuntime
from langbot.pkg.box.service import BoxService
class FakeBackend(BaseSandboxBackend):
def __init__(self, logger: Mock, available: bool = True):
super().__init__(logger)
self.name = 'fake'
self.available = available
self.start_calls: list[str] = []
self.exec_calls: list[tuple[str, str]] = []
self.stop_calls: list[str] = []
async def is_available(self) -> bool:
return self.available
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
self.start_calls.append(spec.session_id)
now = dt.datetime.now(dt.UTC)
return BoxSessionInfo(
session_id=spec.session_id,
backend_name=self.name,
backend_session_id=f'backend-{spec.session_id}',
image=spec.image,
network=spec.network,
created_at=now,
last_used_at=now,
)
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
self.exec_calls.append((session.session_id, spec.cmd))
return BoxExecutionResult(
session_id=session.session_id,
backend_name=self.name,
status=BoxExecutionStatus.COMPLETED,
exit_code=0,
stdout=f'executed: {spec.cmd}',
stderr='',
duration_ms=12,
)
async def stop_session(self, session: BoxSessionInfo):
self.stop_calls.append(session.session_id)
def make_query(query_id: int = 42) -> pipeline_query.Query:
return pipeline_query.Query.model_construct(query_id=query_id)
@pytest.mark.asyncio
async def test_box_runtime_reuses_request_session():
logger = Mock()
backend = FakeBackend(logger)
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
await runtime.initialize()
first = BoxSpec.model_validate({'cmd': 'echo first', 'session_id': 'req-1'})
second = BoxSpec.model_validate({'cmd': 'echo second', 'session_id': 'req-1'})
await runtime.execute(first)
await runtime.execute(second)
assert backend.start_calls == ['req-1']
assert backend.exec_calls == [('req-1', 'echo first'), ('req-1', 'echo second')]
@pytest.mark.asyncio
async def test_box_service_defaults_session_id_from_query():
logger = Mock()
backend = FakeBackend(logger)
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
service = BoxService(SimpleNamespace(logger=logger), runtime=runtime)
await service.initialize()
result = await service.execute_sandbox_tool({'cmd': 'pwd', 'network': BoxNetworkMode.OFF.value}, make_query(7))
assert result['session_id'] == '7'
assert result['ok'] is True
assert backend.start_calls == ['7']
@pytest.mark.asyncio
async def test_box_service_fails_closed_when_backend_unavailable():
logger = Mock()
backend = FakeBackend(logger, available=False)
runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300)
service = BoxService(SimpleNamespace(logger=logger), runtime=runtime)
await service.initialize()
with pytest.raises(BoxBackendUnavailableError):
await service.execute_sandbox_tool({'cmd': 'echo hello'}, make_query(9))

View File

@@ -0,0 +1,149 @@
from __future__ import annotations
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
from langbot.pkg.provider.runners.localagent import LocalAgentRunner
class RecordingProvider:
def __init__(self):
self.requests: list[dict] = []
async def invoke_llm(self, query, model, messages, funcs, extra_args=None, remove_think=None):
self.requests.append(
{
'messages': list(messages),
'funcs': list(funcs),
'remove_think': remove_think,
}
)
if len(self.requests) == 1:
return provider_message.Message(
role='assistant',
content='Let me calculate that exactly.',
tool_calls=[
provider_message.ToolCall(
id='call-1',
type='function',
function=provider_message.FunctionCall(
name='sandbox_exec',
arguments=json.dumps(
{
'cmd': (
"python - <<'PY'\n"
"nums = [1, 2, 3, 4]\n"
'print(sum(nums) / len(nums))\n'
'PY'
)
}
),
),
)
],
)
tool_result = json.loads(messages[-1].content)
return provider_message.Message(
role='assistant',
content=f"The average is {tool_result['stdout']}.",
)
def make_query() -> pipeline_query.Query:
adapter = AsyncMock()
adapter.is_stream_output_supported = AsyncMock(return_value=False)
return pipeline_query.Query.model_construct(
query_id='avg-query',
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=12345,
sender_id=12345,
message_chain=[],
message_event=None,
adapter=adapter,
pipeline_uuid='pipeline-uuid',
bot_uuid='bot-uuid',
pipeline_config={
'ai': {
'runner': {'runner': 'local-agent'},
'local-agent': {'model': {'primary': 'test-model-uuid', 'fallbacks': []}, 'prompt': 'test-prompt'},
},
'output': {'misc': {'remove-think': False}},
},
prompt=SimpleNamespace(messages=[]),
messages=[],
user_message=provider_message.Message(
role='user',
content='Please calculate the average of 1, 2, 3, and 4.',
),
use_funcs=[SimpleNamespace(name='sandbox_exec')],
use_llm_model_uuid='test-model-uuid',
variables={},
)
@pytest.mark.asyncio
async def test_localagent_uses_sandbox_exec_for_exact_calculation():
provider = RecordingProvider()
model = SimpleNamespace(
provider=provider,
model_entity=SimpleNamespace(
uuid='test-model-uuid',
name='test-model',
abilities=['func_call'],
extra_args={},
),
)
tool_manager = SimpleNamespace(
execute_func_call=AsyncMock(
return_value={
'session_id': 'avg-query',
'backend': 'podman',
'status': 'completed',
'ok': True,
'exit_code': 0,
'stdout': '2.5',
'stderr': '',
'duration_ms': 18,
}
)
)
app = SimpleNamespace(
logger=Mock(),
model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)),
tool_mgr=tool_manager,
rag_mgr=SimpleNamespace(),
)
runner = LocalAgentRunner(app, pipeline_config={})
query = make_query()
results = [message async for message in runner.run(query)]
assert [message.role for message in results] == ['assistant', 'tool', 'assistant']
assert results[-1].content == 'The average is 2.5.'
tool_manager.execute_func_call.assert_awaited_once()
tool_name, tool_parameters = tool_manager.execute_func_call.await_args.args[:2]
assert tool_name == 'sandbox_exec'
assert "print(sum(nums) / len(nums))" in tool_parameters['cmd']
first_request = provider.requests[0]
assert any(
message.role == 'system'
and 'sandbox_exec' in str(message.content)
and 'exact calculations' in str(message.content)
for message in first_request['messages']
)
assert [tool.name for tool in first_request['funcs']] == ['sandbox_exec']

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import Mock
import pytest
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot.pkg.provider.tools.toolmgr import ToolManager
class StubLoader:
def __init__(self, tools: list[resource_tool.LLMTool] | None = None, invoke_result=None):
self._tools = tools or []
self._invoke_result = invoke_result
async def get_tools(self, *_args, **_kwargs):
return self._tools
async def has_tool(self, name: str) -> bool:
return any(tool.name == name for tool in self._tools)
async def invoke_tool(self, name: str, parameters: dict, query):
return self._invoke_result(name, parameters, query) if callable(self._invoke_result) else self._invoke_result
async def shutdown(self):
return None
def make_tool(name: str) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=name,
human_desc=name,
description=name,
parameters={'type': 'object', 'properties': {}},
func=lambda parameters: parameters,
)
@pytest.mark.asyncio
async def test_tool_manager_lists_native_tools_first():
manager = ToolManager(SimpleNamespace())
manager.native_tool_loader = StubLoader([make_tool('sandbox_exec')])
manager.plugin_tool_loader = StubLoader([make_tool('plugin_tool')])
manager.mcp_tool_loader = StubLoader([make_tool('mcp_tool')])
tools = await manager.get_all_tools()
assert [tool.name for tool in tools] == ['sandbox_exec', 'plugin_tool', 'mcp_tool']
@pytest.mark.asyncio
async def test_tool_manager_routes_native_tool_calls():
app = SimpleNamespace()
manager = ToolManager(app)
manager.native_tool_loader = StubLoader([make_tool('sandbox_exec')], invoke_result={'backend': 'fake'})
manager.plugin_tool_loader = StubLoader([make_tool('plugin_tool')])
manager.mcp_tool_loader = StubLoader([make_tool('mcp_tool')])
result = await manager.execute_func_call('sandbox_exec', {'cmd': 'pwd'}, query=Mock())
assert result == {'backend': 'fake'}