diff --git a/src/langbot/pkg/box/__init__.py b/src/langbot/pkg/box/__init__.py new file mode 100644 index 00000000..c1ea6e13 --- /dev/null +++ b/src/langbot/pkg/box/__init__.py @@ -0,0 +1 @@ +"""LangBot Box runtime package.""" diff --git a/src/langbot/pkg/box/backend.py b/src/langbot/pkg/box/backend.py new file mode 100644 index 00000000..3c6672de --- /dev/null +++ b/src/langbot/pkg/box/backend.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import abc +import asyncio +import dataclasses +import datetime as dt +import logging +import re +import shlex +import shutil +import uuid + +from .errors import BoxError +from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec + + +@dataclasses.dataclass(slots=True) +class _CommandResult: + return_code: int + stdout: str + stderr: str + timed_out: bool = False + + +class BaseSandboxBackend(abc.ABC): + name: str + + def __init__(self, logger: logging.Logger): + self.logger = logger + + async def initialize(self): + return None + + @abc.abstractmethod + async def is_available(self) -> bool: + pass + + @abc.abstractmethod + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + pass + + @abc.abstractmethod + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + pass + + @abc.abstractmethod + async def stop_session(self, session: BoxSessionInfo): + pass + + +class CLISandboxBackend(BaseSandboxBackend): + command: str + + def __init__(self, logger: logging.Logger, command: str, backend_name: str): + super().__init__(logger) + self.command = command + self.name = backend_name + + async def is_available(self) -> bool: + if shutil.which(self.command) is None: + return False + + result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False) + return result.return_code == 0 and not result.timed_out + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + now = dt.datetime.now(dt.UTC) + container_name = self._build_container_name(spec.session_id) + + args = [ + self.command, + 'run', + '-d', + '--rm', + '--name', + container_name, + '--label', + 'langbot.box=true', + '--label', + f'langbot.session_id={spec.session_id}', + ] + + if spec.network.value == 'off': + args.extend(['--network', 'none']) + + args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) + + await self._run_command(args, timeout_sec=30, check=True) + + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=container_name, + image=spec.image, + network=spec.network, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + start = dt.datetime.now(dt.UTC) + args = [self.command, 'exec'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_exec_command(spec.workdir, spec.cmd), + ] + ) + + result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False) + duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) + + if result.timed_out: + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.TIMED_OUT, + exit_code=None, + stdout=result.stdout, + stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', + duration_ms=duration_ms, + ) + + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + ) + + async def stop_session(self, session: BoxSessionInfo): + await self._run_command( + [self.command, 'rm', '-f', session.backend_session_id], + timeout_sec=20, + check=False, + ) + + def _build_container_name(self, session_id: str) -> str: + normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session' + suffix = uuid.uuid4().hex[:8] + return f'langbot-box-{normalized[:32]}-{suffix}' + + def _build_exec_command(self, workdir: str, cmd: str) -> str: + quoted_workdir = shlex.quote(workdir) + return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}' + + async def _run_command( + self, + args: list[str], + timeout_sec: int, + check: bool, + ) -> _CommandResult: + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=timeout_sec) + except asyncio.TimeoutError: + process.kill() + stdout_bytes, stderr_bytes = await process.communicate() + return _CommandResult( + return_code=-1, + stdout=stdout_bytes.decode('utf-8', errors='replace').strip(), + stderr=stderr_bytes.decode('utf-8', errors='replace').strip(), + timed_out=True, + ) + + stdout = stdout_bytes.decode('utf-8', errors='replace').strip() + stderr = stderr_bytes.decode('utf-8', errors='replace').strip() + + if check and process.returncode != 0: + raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error')) + + return _CommandResult( + return_code=process.returncode, + stdout=stdout, + stderr=stderr, + timed_out=False, + ) + + def _format_cli_error(self, message: str) -> str: + message = ' '.join(message.split()) + if len(message) > 300: + message = f'{message[:297]}...' + return f'{self.name} backend error: {message}' + + +class PodmanBackend(CLISandboxBackend): + def __init__(self, logger: logging.Logger): + super().__init__(logger=logger, command='podman', backend_name='podman') + + +class DockerBackend(CLISandboxBackend): + def __init__(self, logger: logging.Logger): + super().__init__(logger=logger, command='docker', backend_name='docker') diff --git a/src/langbot/pkg/box/errors.py b/src/langbot/pkg/box/errors.py new file mode 100644 index 00000000..7790945d --- /dev/null +++ b/src/langbot/pkg/box/errors.py @@ -0,0 +1,17 @@ +from __future__ import annotations + + +class BoxError(RuntimeError): + """Base error for LangBot Box failures.""" + + +class BoxValidationError(BoxError): + """Raised when sandbox_exec arguments are invalid.""" + + +class BoxBackendUnavailableError(BoxError): + """Raised when no supported container backend is available.""" + + +class BoxSessionConflictError(BoxError): + """Raised when an existing session cannot satisfy a new request.""" diff --git a/src/langbot/pkg/box/models.py b/src/langbot/pkg/box/models.py new file mode 100644 index 00000000..8c9d4a23 --- /dev/null +++ b/src/langbot/pkg/box/models.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import datetime as dt +import enum + +import pydantic + + +DEFAULT_BOX_IMAGE = 'python:3.11-slim' + + +class BoxNetworkMode(str, enum.Enum): + OFF = 'off' + ON = 'on' + + +class BoxExecutionStatus(str, enum.Enum): + COMPLETED = 'completed' + TIMED_OUT = 'timed_out' + + +class BoxSpec(pydantic.BaseModel): + cmd: str + workdir: str = '/workspace' + timeout_sec: int = 30 + network: BoxNetworkMode = BoxNetworkMode.OFF + session_id: str + env: dict[str, str] = pydantic.Field(default_factory=dict) + image: str = DEFAULT_BOX_IMAGE + + @pydantic.field_validator('cmd') + @classmethod + def validate_cmd(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('cmd must not be empty') + return value + + @pydantic.field_validator('workdir') + @classmethod + def validate_workdir(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('workdir must be an absolute path inside the sandbox') + return value + + @pydantic.field_validator('timeout_sec') + @classmethod + def validate_timeout_sec(cls, value: int) -> int: + if value <= 0: + raise ValueError('timeout_sec must be greater than 0') + return value + + @pydantic.field_validator('session_id') + @classmethod + def validate_session_id(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('session_id must not be empty') + return value + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + +class BoxSessionInfo(pydantic.BaseModel): + session_id: str + backend_name: str + backend_session_id: str + image: str + network: BoxNetworkMode + created_at: dt.datetime + last_used_at: dt.datetime + + +class BoxExecutionResult(pydantic.BaseModel): + session_id: str + backend_name: str + status: BoxExecutionStatus + exit_code: int | None + stdout: str = '' + stderr: str = '' + duration_ms: int + + @property + def ok(self) -> bool: + return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0 diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py new file mode 100644 index 00000000..6bfdab12 --- /dev/null +++ b/src/langbot/pkg/box/runtime.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import asyncio +import dataclasses +import datetime as dt +import logging + +from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend +from .errors import BoxBackendUnavailableError, BoxSessionConflictError +from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec + + +@dataclasses.dataclass(slots=True) +class _RuntimeSession: + info: BoxSessionInfo + lock: asyncio.Lock + + +class BoxRuntime: + def __init__( + self, + logger: logging.Logger, + backends: list[BaseSandboxBackend] | None = None, + session_ttl_sec: int = 300, + ): + self.logger = logger + self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)] + self.session_ttl_sec = session_ttl_sec + self._backend: BaseSandboxBackend | None = None + self._sessions: dict[str, _RuntimeSession] = {} + self._lock = asyncio.Lock() + + async def initialize(self): + self._backend = await self._select_backend() + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + session = await self._get_or_create_session(spec) + + async with session.lock: + result = await (await self._get_backend()).exec(session.info, spec) + + async with self._lock: + now = dt.datetime.now(dt.UTC) + if spec.session_id in self._sessions: + self._sessions[spec.session_id].info.last_used_at = now + + if result.status == BoxExecutionStatus.TIMED_OUT: + await self._drop_session_locked(spec.session_id) + + return result + + async def shutdown(self): + async with self._lock: + session_ids = list(self._sessions.keys()) + for session_id in session_ids: + await self._drop_session_locked(session_id) + + async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: + async with self._lock: + await self._reap_expired_sessions_locked() + + existing = self._sessions.get(spec.session_id) + if existing is not None: + self._assert_session_compatible(existing.info, spec) + existing.info.last_used_at = dt.datetime.now(dt.UTC) + return existing + + backend = await self._get_backend() + info = await backend.start_session(spec) + runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock()) + self._sessions[spec.session_id] = runtime_session + return runtime_session + + async def _get_backend(self) -> BaseSandboxBackend: + if self._backend is None: + self._backend = await self._select_backend() + if self._backend is None: + raise BoxBackendUnavailableError( + 'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.' + ) + return self._backend + + async def _select_backend(self) -> BaseSandboxBackend | None: + for backend in self.backends: + try: + await backend.initialize() + if await backend.is_available(): + self.logger.info(f'LangBot Box using backend: {backend.name}') + return backend + except Exception as exc: + self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') + + self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready') + return None + + async def _reap_expired_sessions_locked(self): + if self.session_ttl_sec <= 0: + return + + deadline = dt.datetime.now(dt.UTC) - dt.timedelta(seconds=self.session_ttl_sec) + expired_session_ids = [ + session_id + for session_id, session in self._sessions.items() + if session.info.last_used_at < deadline + ] + + for session_id in expired_session_ids: + await self._drop_session_locked(session_id) + + async def _drop_session_locked(self, session_id: str): + runtime_session = self._sessions.pop(session_id, None) + if runtime_session is None or self._backend is None: + return + + try: + await self._backend.stop_session(runtime_session.info) + except Exception as exc: + self.logger.warning(f'Failed to clean up box session {session_id}: {exc}') + + def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): + if session.network != spec.network: + raise BoxSessionConflictError( + f'sandbox_exec session {spec.session_id} already exists with network={session.network.value}' + ) + if session.image != spec.image: + raise BoxSessionConflictError( + f'sandbox_exec session {spec.session_id} already exists with image={session.image}' + ) diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py new file mode 100644 index 00000000..d1114749 --- /dev/null +++ b/src/langbot/pkg/box/service.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pydantic + +from .errors import BoxValidationError +from .models import BoxExecutionResult, BoxSpec +from .runtime import BoxRuntime + +if TYPE_CHECKING: + from ..core import app as core_app + import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query + + +class BoxService: + def __init__( + self, + ap: 'core_app.Application', + runtime: BoxRuntime | None = None, + output_limit_chars: int = 4000, + ): + self.ap = ap + self.runtime = runtime or BoxRuntime(logger=ap.logger) + self.output_limit_chars = output_limit_chars + + async def initialize(self): + await self.runtime.initialize() + + async def execute_sandbox_tool(self, parameters: dict, query: 'pipeline_query.Query') -> dict: + spec_payload = dict(parameters) + spec_payload.setdefault('session_id', str(query.query_id)) + spec_payload.setdefault('env', {}) + + try: + spec = BoxSpec.model_validate(spec_payload) + except pydantic.ValidationError as exc: + first_error = exc.errors()[0] + raise BoxValidationError(first_error.get('msg', 'invalid sandbox_exec arguments')) from exc + + result = await self.runtime.execute(spec) + return self._serialize_result(result) + + async def shutdown(self): + await self.runtime.shutdown() + + def _serialize_result(self, result: BoxExecutionResult) -> dict: + stdout, stdout_truncated = self._truncate(result.stdout) + stderr, stderr_truncated = self._truncate(result.stderr) + + return { + 'session_id': result.session_id, + 'backend': result.backend_name, + 'status': result.status.value, + 'ok': result.ok, + 'exit_code': result.exit_code, + 'stdout': stdout, + 'stderr': stderr, + 'stdout_truncated': stdout_truncated, + 'stderr_truncated': stderr_truncated, + 'duration_ms': result.duration_ms, + } + + def _truncate(self, text: str) -> tuple[str, bool]: + if len(text) <= self.output_limit_chars: + return text, False + return f'{text[: self.output_limit_chars]}...', True diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index aa1acd61..dbde2a46 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -9,6 +9,7 @@ from ..platform import botmgr as im_mgr from ..platform.webhook_pusher import WebhookPusher from ..provider.session import sessionmgr as llm_session_mgr from ..provider.modelmgr import modelmgr as llm_model_mgr +from ..box import service as box_service_module from langbot.pkg.provider.tools import toolmgr as llm_tool_mgr from ..config import manager as config_mgr @@ -69,6 +70,7 @@ class Application: # TODO move to pipeline tool_mgr: llm_tool_mgr.ToolManager = None + box_service: box_service_module.BoxService = None # ======= Config manager ======= diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 71ff4262..36f050d7 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -8,6 +8,7 @@ from ...pipeline import pool, controller, pipelinemgr from ...pipeline import aggregator as message_aggregator from ...plugin import connector as plugin_connector from ...command import cmdmgr +from ...box import service as box_service from ...provider.session import sessionmgr as llm_session_mgr from ...provider.modelmgr import modelmgr as llm_model_mgr from ...provider.tools import toolmgr as llm_tool_mgr @@ -128,6 +129,10 @@ class BuildAppStage(stage.BootingStage): await llm_session_mgr_inst.initialize() ap.sess_mgr = llm_session_mgr_inst + box_service_inst = box_service.BoxService(ap) + await box_service_inst.initialize() + ap.box_service = box_service_inst + llm_tool_mgr_inst = llm_tool_mgr.ToolManager(ap) await llm_tool_mgr_inst.initialize() ap.tool_mgr = llm_tool_mgr_inst diff --git a/src/langbot/pkg/pipeline/stage.py b/src/langbot/pkg/pipeline/stage.py index 0ff1af7e..bec31d16 100644 --- a/src/langbot/pkg/pipeline/stage.py +++ b/src/langbot/pkg/pipeline/stage.py @@ -3,7 +3,8 @@ from __future__ import annotations import abc import typing -from ..core import app +if typing.TYPE_CHECKING: + from ..core import app from . import entities import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -22,9 +23,9 @@ def stage_class(name: str) -> typing.Callable[[type[PipelineStage]], type[Pipeli class PipelineStage(metaclass=abc.ABCMeta): """流水线阶段""" - ap: app.Application + ap: 'app.Application' - def __init__(self, ap: app.Application): + def __init__(self, ap: 'app.Application'): self.ap = ap async def initialize(self, pipeline_config: dict): diff --git a/src/langbot/pkg/provider/runner.py b/src/langbot/pkg/provider/runner.py index f89c079d..1b519c38 100644 --- a/src/langbot/pkg/provider/runner.py +++ b/src/langbot/pkg/provider/runner.py @@ -3,7 +3,8 @@ from __future__ import annotations import abc import typing -from ..core import app +if typing.TYPE_CHECKING: + from ..core import app preregistered_runners: list[typing.Type[RequestRunner]] = [] @@ -25,11 +26,11 @@ class RequestRunner(abc.ABC): name: str = None - ap: app.Application + ap: 'app.Application' pipeline_config: dict - def __init__(self, ap: app.Application, pipeline_config: dict): + def __init__(self, ap: 'app.Application', pipeline_config: dict): self.ap = ap self.pipeline_config = pipeline_config diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index b48e9cc3..7b7088b0 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -24,11 +24,37 @@ Respond in the same language as the user's input. """ +SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' +SANDBOX_EXEC_SYSTEM_GUIDANCE = ( + 'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, ' + 'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, ' + 'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec ' + 'and then answer from the tool result.' +) + @runner.runner_class('local-agent') class LocalAgentRunner(runner.RequestRunner): """Local agent request runner""" + def _build_request_messages( + self, + query: pipeline_query.Query, + user_message: provider_message.Message, + ) -> list[provider_message.Message]: + req_messages = query.prompt.messages.copy() + query.messages.copy() + + if any(getattr(tool, 'name', None) == SANDBOX_EXEC_TOOL_NAME for tool in query.use_funcs or []): + req_messages.append( + provider_message.Message( + role='system', + content=SANDBOX_EXEC_SYSTEM_GUIDANCE, + ) + ) + + req_messages.append(user_message) + return req_messages + async def _get_model_candidates( self, query: pipeline_query.Query, @@ -236,7 +262,7 @@ class LocalAgentRunner(runner.RequestRunner): ce.text = final_user_message_text break - req_messages = query.prompt.messages.copy() + query.messages.copy() + [user_message] + req_messages = self._build_request_messages(query, user_message) try: is_stream = await query.adapter.is_stream_output_supported() diff --git a/src/langbot/pkg/provider/tools/loaders/native.py b/src/langbot/pkg/provider/tools/loaders/native.py new file mode 100644 index 00000000..0fe787ee --- /dev/null +++ b/src/langbot/pkg/provider/tools/loaders/native.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import langbot_plugin.api.entities.builtin.resource.tool as resource_tool +from langbot_plugin.api.entities.events import pipeline_query + +from .. import loader + + +class NativeToolLoader(loader.ToolLoader): + SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' + + async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]: + return [self._build_sandbox_exec_tool()] + + async def has_tool(self, name: str) -> bool: + return name == self.SANDBOX_EXEC_TOOL_NAME + + async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query): + if name != self.SANDBOX_EXEC_TOOL_NAME: + raise ValueError(f'未找到工具: {name}') + return await self.ap.box_service.execute_sandbox_tool(parameters, query) + + async def shutdown(self): + if getattr(self.ap, 'box_service', None) is not None: + await self.ap.box_service.shutdown() + + def _build_sandbox_exec_tool(self) -> resource_tool.LLMTool: + return resource_tool.LLMTool( + name=self.SANDBOX_EXEC_TOOL_NAME, + human_desc='Execute a command inside the LangBot Box sandbox', + description=( + 'Run shell commands only inside the isolated LangBot Box sandbox. ' + 'Use this tool for local file edits, bash commands, Python execution, and exact calculations over ' + 'user-provided data that must not touch the host.' + ), + parameters={ + 'type': 'object', + 'properties': { + 'cmd': { + 'type': 'string', + 'description': 'Shell command to execute inside the sandbox.', + }, + 'workdir': { + 'type': 'string', + 'description': 'Absolute working directory path inside the sandbox. Defaults to /workspace.', + 'default': '/workspace', + }, + 'timeout_sec': { + 'type': 'integer', + 'description': 'Execution timeout in seconds. Defaults to 30.', + 'default': 30, + 'minimum': 1, + }, + 'network': { + 'type': 'string', + 'description': 'Network policy for the sandbox session. Prefer off unless network is required.', + 'enum': ['off', 'on'], + 'default': 'off', + }, + 'session_id': { + 'type': 'string', + 'description': 'Optional sandbox session id. Defaults to the current request id for reuse.', + }, + 'env': { + 'type': 'object', + 'description': 'Optional environment variables to expose inside the sandbox.', + 'additionalProperties': {'type': 'string'}, + 'default': {}, + }, + }, + 'required': ['cmd'], + 'additionalProperties': False, + }, + func=lambda parameters: parameters, + ) diff --git a/src/langbot/pkg/provider/tools/toolmgr.py b/src/langbot/pkg/provider/tools/toolmgr.py index f921c094..75813dde 100644 --- a/src/langbot/pkg/provider/tools/toolmgr.py +++ b/src/langbot/pkg/provider/tools/toolmgr.py @@ -5,7 +5,7 @@ import typing from ...core import app from langbot.pkg.utils import importutil from langbot.pkg.provider.tools import loaders -from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, plugin as plugin_loader +from langbot.pkg.provider.tools.loaders import mcp as mcp_loader, native as native_loader, plugin as plugin_loader import langbot_plugin.api.entities.builtin.resource.tool as resource_tool from langbot_plugin.api.entities.events import pipeline_query @@ -17,6 +17,7 @@ class ToolManager: ap: app.Application + native_tool_loader: native_loader.NativeToolLoader plugin_tool_loader: plugin_loader.PluginToolLoader mcp_tool_loader: mcp_loader.MCPLoader @@ -24,6 +25,8 @@ class ToolManager: self.ap = ap async def initialize(self): + self.native_tool_loader = native_loader.NativeToolLoader(self.ap) + await self.native_tool_loader.initialize() self.plugin_tool_loader = plugin_loader.PluginToolLoader(self.ap) await self.plugin_tool_loader.initialize() self.mcp_tool_loader = mcp_loader.MCPLoader(self.ap) @@ -35,6 +38,7 @@ class ToolManager: """获取所有函数""" all_functions: list[resource_tool.LLMTool] = [] + all_functions.extend(await self.native_tool_loader.get_tools()) all_functions.extend(await self.plugin_tool_loader.get_tools(bound_plugins)) all_functions.extend(await self.mcp_tool_loader.get_tools(bound_mcp_servers)) @@ -95,7 +99,9 @@ class ToolManager: async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any: """执行函数调用""" - if await self.plugin_tool_loader.has_tool(name): + if await self.native_tool_loader.has_tool(name): + return await self.native_tool_loader.invoke_tool(name, parameters, query) + elif await self.plugin_tool_loader.has_tool(name): return await self.plugin_tool_loader.invoke_tool(name, parameters, query) elif await self.mcp_tool_loader.has_tool(name): return await self.mcp_tool_loader.invoke_tool(name, parameters, query) @@ -104,5 +110,6 @@ class ToolManager: async def shutdown(self): """关闭所有工具""" + await self.native_tool_loader.shutdown() await self.plugin_tool_loader.shutdown() await self.mcp_tool_loader.shutdown() diff --git a/src/langbot/templates/default-pipeline-config.json b/src/langbot/templates/default-pipeline-config.json index e40d3914..eb89053e 100644 --- a/src/langbot/templates/default-pipeline-config.json +++ b/src/langbot/templates/default-pipeline-config.json @@ -49,7 +49,7 @@ "prompt": [ { "role": "system", - "content": "You are a helpful assistant." + "content": "You are a helpful assistant. When tools are available, use them for exact calculations, data processing, and code execution instead of guessing." } ], "knowledge-bases": [], diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py new file mode 100644 index 00000000..ab4b7c9e --- /dev/null +++ b/tests/unit_tests/box/test_box_service.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import datetime as dt +from types import SimpleNamespace +from unittest.mock import Mock + +import pytest + +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query + +from langbot.pkg.box.backend import BaseSandboxBackend +from langbot.pkg.box.errors import BoxBackendUnavailableError +from langbot.pkg.box.models import BoxExecutionResult, BoxExecutionStatus, BoxNetworkMode, BoxSessionInfo, BoxSpec +from langbot.pkg.box.runtime import BoxRuntime +from langbot.pkg.box.service import BoxService + + +class FakeBackend(BaseSandboxBackend): + def __init__(self, logger: Mock, available: bool = True): + super().__init__(logger) + self.name = 'fake' + self.available = available + self.start_calls: list[str] = [] + self.exec_calls: list[tuple[str, str]] = [] + self.stop_calls: list[str] = [] + + async def is_available(self) -> bool: + return self.available + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + self.start_calls.append(spec.session_id) + now = dt.datetime.now(dt.UTC) + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=f'backend-{spec.session_id}', + image=spec.image, + network=spec.network, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + self.exec_calls.append((session.session_id, spec.cmd)) + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=0, + stdout=f'executed: {spec.cmd}', + stderr='', + duration_ms=12, + ) + + async def stop_session(self, session: BoxSessionInfo): + self.stop_calls.append(session.session_id) + + +def make_query(query_id: int = 42) -> pipeline_query.Query: + return pipeline_query.Query.model_construct(query_id=query_id) + + +@pytest.mark.asyncio +async def test_box_runtime_reuses_request_session(): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + await runtime.initialize() + + first = BoxSpec.model_validate({'cmd': 'echo first', 'session_id': 'req-1'}) + second = BoxSpec.model_validate({'cmd': 'echo second', 'session_id': 'req-1'}) + + await runtime.execute(first) + await runtime.execute(second) + + assert backend.start_calls == ['req-1'] + assert backend.exec_calls == [('req-1', 'echo first'), ('req-1', 'echo second')] + + +@pytest.mark.asyncio +async def test_box_service_defaults_session_id_from_query(): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + service = BoxService(SimpleNamespace(logger=logger), runtime=runtime) + await service.initialize() + + result = await service.execute_sandbox_tool({'cmd': 'pwd', 'network': BoxNetworkMode.OFF.value}, make_query(7)) + + assert result['session_id'] == '7' + assert result['ok'] is True + assert backend.start_calls == ['7'] + + +@pytest.mark.asyncio +async def test_box_service_fails_closed_when_backend_unavailable(): + logger = Mock() + backend = FakeBackend(logger, available=False) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + service = BoxService(SimpleNamespace(logger=logger), runtime=runtime) + await service.initialize() + + with pytest.raises(BoxBackendUnavailableError): + await service.execute_sandbox_tool({'cmd': 'echo hello'}, make_query(9)) diff --git a/tests/unit_tests/provider/test_localagent_sandbox_exec.py b/tests/unit_tests/provider/test_localagent_sandbox_exec.py new file mode 100644 index 00000000..d192ac1e --- /dev/null +++ b/tests/unit_tests/provider/test_localagent_sandbox_exec.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import json +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +import langbot_plugin.api.entities.builtin.provider.message as provider_message +import langbot_plugin.api.entities.builtin.provider.session as provider_session + +from langbot.pkg.provider.runners.localagent import LocalAgentRunner + + +class RecordingProvider: + def __init__(self): + self.requests: list[dict] = [] + + async def invoke_llm(self, query, model, messages, funcs, extra_args=None, remove_think=None): + self.requests.append( + { + 'messages': list(messages), + 'funcs': list(funcs), + 'remove_think': remove_think, + } + ) + + if len(self.requests) == 1: + return provider_message.Message( + role='assistant', + content='Let me calculate that exactly.', + tool_calls=[ + provider_message.ToolCall( + id='call-1', + type='function', + function=provider_message.FunctionCall( + name='sandbox_exec', + arguments=json.dumps( + { + 'cmd': ( + "python - <<'PY'\n" + "nums = [1, 2, 3, 4]\n" + 'print(sum(nums) / len(nums))\n' + 'PY' + ) + } + ), + ), + ) + ], + ) + + tool_result = json.loads(messages[-1].content) + return provider_message.Message( + role='assistant', + content=f"The average is {tool_result['stdout']}.", + ) + + +def make_query() -> pipeline_query.Query: + adapter = AsyncMock() + adapter.is_stream_output_supported = AsyncMock(return_value=False) + + return pipeline_query.Query.model_construct( + query_id='avg-query', + launcher_type=provider_session.LauncherTypes.PERSON, + launcher_id=12345, + sender_id=12345, + message_chain=[], + message_event=None, + adapter=adapter, + pipeline_uuid='pipeline-uuid', + bot_uuid='bot-uuid', + pipeline_config={ + 'ai': { + 'runner': {'runner': 'local-agent'}, + 'local-agent': {'model': {'primary': 'test-model-uuid', 'fallbacks': []}, 'prompt': 'test-prompt'}, + }, + 'output': {'misc': {'remove-think': False}}, + }, + prompt=SimpleNamespace(messages=[]), + messages=[], + user_message=provider_message.Message( + role='user', + content='Please calculate the average of 1, 2, 3, and 4.', + ), + use_funcs=[SimpleNamespace(name='sandbox_exec')], + use_llm_model_uuid='test-model-uuid', + variables={}, + ) + + +@pytest.mark.asyncio +async def test_localagent_uses_sandbox_exec_for_exact_calculation(): + provider = RecordingProvider() + model = SimpleNamespace( + provider=provider, + model_entity=SimpleNamespace( + uuid='test-model-uuid', + name='test-model', + abilities=['func_call'], + extra_args={}, + ), + ) + + tool_manager = SimpleNamespace( + execute_func_call=AsyncMock( + return_value={ + 'session_id': 'avg-query', + 'backend': 'podman', + 'status': 'completed', + 'ok': True, + 'exit_code': 0, + 'stdout': '2.5', + 'stderr': '', + 'duration_ms': 18, + } + ) + ) + + app = SimpleNamespace( + logger=Mock(), + model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)), + tool_mgr=tool_manager, + rag_mgr=SimpleNamespace(), + ) + + runner = LocalAgentRunner(app, pipeline_config={}) + query = make_query() + + results = [message async for message in runner.run(query)] + + assert [message.role for message in results] == ['assistant', 'tool', 'assistant'] + assert results[-1].content == 'The average is 2.5.' + + tool_manager.execute_func_call.assert_awaited_once() + tool_name, tool_parameters = tool_manager.execute_func_call.await_args.args[:2] + assert tool_name == 'sandbox_exec' + assert "print(sum(nums) / len(nums))" in tool_parameters['cmd'] + + first_request = provider.requests[0] + assert any( + message.role == 'system' + and 'sandbox_exec' in str(message.content) + and 'exact calculations' in str(message.content) + for message in first_request['messages'] + ) + assert [tool.name for tool in first_request['funcs']] == ['sandbox_exec'] diff --git a/tests/unit_tests/provider/test_tool_manager_native.py b/tests/unit_tests/provider/test_tool_manager_native.py new file mode 100644 index 00000000..b9d51c1d --- /dev/null +++ b/tests/unit_tests/provider/test_tool_manager_native.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import Mock + +import pytest + +import langbot_plugin.api.entities.builtin.resource.tool as resource_tool + +from langbot.pkg.provider.tools.toolmgr import ToolManager + + +class StubLoader: + def __init__(self, tools: list[resource_tool.LLMTool] | None = None, invoke_result=None): + self._tools = tools or [] + self._invoke_result = invoke_result + + async def get_tools(self, *_args, **_kwargs): + return self._tools + + async def has_tool(self, name: str) -> bool: + return any(tool.name == name for tool in self._tools) + + async def invoke_tool(self, name: str, parameters: dict, query): + return self._invoke_result(name, parameters, query) if callable(self._invoke_result) else self._invoke_result + + async def shutdown(self): + return None + + +def make_tool(name: str) -> resource_tool.LLMTool: + return resource_tool.LLMTool( + name=name, + human_desc=name, + description=name, + parameters={'type': 'object', 'properties': {}}, + func=lambda parameters: parameters, + ) + + +@pytest.mark.asyncio +async def test_tool_manager_lists_native_tools_first(): + manager = ToolManager(SimpleNamespace()) + manager.native_tool_loader = StubLoader([make_tool('sandbox_exec')]) + manager.plugin_tool_loader = StubLoader([make_tool('plugin_tool')]) + manager.mcp_tool_loader = StubLoader([make_tool('mcp_tool')]) + + tools = await manager.get_all_tools() + + assert [tool.name for tool in tools] == ['sandbox_exec', 'plugin_tool', 'mcp_tool'] + + +@pytest.mark.asyncio +async def test_tool_manager_routes_native_tool_calls(): + app = SimpleNamespace() + manager = ToolManager(app) + manager.native_tool_loader = StubLoader([make_tool('sandbox_exec')], invoke_result={'backend': 'fake'}) + manager.plugin_tool_loader = StubLoader([make_tool('plugin_tool')]) + manager.mcp_tool_loader = StubLoader([make_tool('mcp_tool')]) + + result = await manager.execute_func_call('sandbox_exec', {'cmd': 'pwd'}, query=Mock()) + + assert result == {'backend': 'fake'}