From 76fbd086804d03044b43e30236b31d7f56d9c8fc Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 02:28:25 +0000 Subject: [PATCH] refactor(box): clean up sandbox subsystem code quality and efficiency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix O(n²) stderr trimming in runtime.py with running length tracker - Remove dead code: RESERVED_CONTAINER_PATHS, _subprocess_wait_task, unused config_hash computation, unused imports - Deduplicate connection callback in BoxRuntimeConnector, parse URL once - Use enum comparison instead of stringly-typed spec.network.value check - Replace manual _result_to_dict/_session_to_dict with model_dump() - Cache NativeToolLoader tool definition and sandbox system guidance - Extract _is_path_under() helper to eliminate duplicated path checks - Import SANDBOX_EXEC_TOOL_NAME from native.py instead of redefining - Add JSON startswith guard in logging_utils to skip futile json.loads - Fix ruff lint errors (F401 unused imports, F841 unused variables) --- src/langbot/pkg/box/backend.py | 22 +---- src/langbot/pkg/box/connector.py | 92 +++++++------------ src/langbot/pkg/box/runtime.py | 25 ++--- src/langbot/pkg/box/security.py | 7 -- src/langbot/pkg/box/server.py | 11 +-- src/langbot/pkg/box/service.py | 9 +- .../pkg/pipeline/process/logging_utils.py | 35 +++---- .../pkg/provider/runners/localagent.py | 14 ++- src/langbot/pkg/provider/tools/loaders/mcp.py | 15 +-- .../pkg/provider/tools/loaders/native.py | 20 ++-- 10 files changed, 101 insertions(+), 149 deletions(-) diff --git a/src/langbot/pkg/box/backend.py b/src/langbot/pkg/box/backend.py index b8208fcc..75ea03a8 100644 --- a/src/langbot/pkg/box/backend.py +++ b/src/langbot/pkg/box/backend.py @@ -4,17 +4,14 @@ import abc import asyncio import dataclasses import datetime as dt -import hashlib -import json import logging import re import shlex import shutil -import typing import uuid from .errors import BoxError -from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, BoxSessionInfo, BoxSpec +from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, BoxNetworkMode, BoxSessionInfo, BoxSpec from .security import validate_sandbox_security # Hard cap on raw subprocess output to prevent unbounded memory usage. @@ -102,20 +99,7 @@ class CLISandboxBackend(BaseSandboxBackend): f'langbot.box.instance_id={self.instance_id}', ] - # Config hash label for identifying configuration drift - config_hash = hashlib.sha256(json.dumps({ - 'image': spec.image, - 'network': spec.network.value, - 'host_path': spec.host_path, - 'host_path_mode': spec.host_path_mode.value, - 'cpus': spec.cpus, - 'memory_mb': spec.memory_mb, - 'pids_limit': spec.pids_limit, - 'read_only_rootfs': spec.read_only_rootfs, - }, sort_keys=True).encode()).hexdigest()[:16] - args.extend(['--label', f'langbot.box.config_hash={config_hash}']) - - if spec.network.value == 'off': + if spec.network == BoxNetworkMode.OFF: args.extend(['--network', 'none']) # Resource limits @@ -353,7 +337,7 @@ class CLISandboxBackend(BaseSandboxBackend): @staticmethod async def _read_stream( - stream: typing.Optional[asyncio.StreamReader], + stream: asyncio.StreamReader | None, limit: int = _MAX_RAW_OUTPUT_BYTES, ) -> tuple[bytes, int]: if stream is None: diff --git a/src/langbot/pkg/box/connector.py b/src/langbot/pkg/box/connector.py index 5c39353b..c17476b4 100644 --- a/src/langbot/pkg/box/connector.py +++ b/src/langbot/pkg/box/connector.py @@ -4,6 +4,7 @@ import asyncio import os import sys from typing import TYPE_CHECKING +from urllib.parse import urlparse from langbot_plugin.entities.io.actions.enums import CommonAction from langbot_plugin.runtime.io.handler import Handler @@ -32,7 +33,11 @@ class BoxRuntimeConnector: self._handler_task: asyncio.Task | None = None self._ctrl_task: asyncio.Task | None = None self._subprocess: asyncio.subprocess.Process | None = None - self._subprocess_wait_task: asyncio.Task | None = None + + # Parse the relay URL once for reuse + parsed = urlparse(self.ws_relay_base_url) + self._relay_host = parsed.hostname or '127.0.0.1' + self._relay_port = parsed.port or 5410 async def initialize(self) -> None: if self.manages_local_runtime: @@ -40,6 +45,26 @@ class BoxRuntimeConnector: else: await self._connect_remote_ws() + def _make_connection_callback( + self, transport_name: str, connected: asyncio.Event, connect_error: list[Exception], + ): + async def new_connection_callback(connection: Connection) -> None: + handler = Handler(connection) + self._handler = handler + self.client.set_handler(handler) + self._handler_task = asyncio.create_task(handler.run()) + try: + await handler.call_action(CommonAction.PING, {}) + self.ap.logger.info(f'Connected to Box runtime via {transport_name}.') + connected.set() + await self._handler_task + except Exception as exc: + if not connected.is_set(): + connect_error.append(exc) + connected.set() + + return new_connection_callback + async def _start_local_stdio(self) -> None: """Launch box server as subprocess and connect via stdio.""" from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController @@ -50,29 +75,15 @@ class BoxRuntimeConnector: connected = asyncio.Event() connect_error: list[Exception] = [] - async def new_connection_callback(connection: Connection) -> None: - handler = Handler.__new__(Handler) - Handler.__init__(handler, connection) - self._handler = handler - self.client.set_handler(handler) - self._handler_task = asyncio.create_task(handler.run()) - try: - await handler.call_action(CommonAction.PING, {}) - self.ap.logger.info('Connected to Box runtime via stdio.') - connected.set() - await self._handler_task - except Exception as exc: - if not connected.is_set(): - connect_error.append(exc) - connected.set() - ctrl = StdioClientController( command=python_path, - args=['-m', 'langbot.pkg.box.server', '--port', str(self._get_ws_relay_port())], + args=['-m', 'langbot.pkg.box.server', '--port', str(self._relay_port)], env=env, ) self._subprocess = None # StdioClientController manages the subprocess - self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback)) + self._ctrl_task = asyncio.create_task( + ctrl.run(self._make_connection_callback('stdio', connected, connect_error)) + ) # Wait for connection or failure try: @@ -90,33 +101,19 @@ class BoxRuntimeConnector: """Connect to a remote box server via WebSocket.""" from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController - ws_url = self._get_rpc_ws_url() + ws_url = f'ws://{self._relay_host}:{self._relay_port + 1}' connected = asyncio.Event() connect_error: list[Exception] = [] - async def new_connection_callback(connection: Connection) -> None: - handler = Handler.__new__(Handler) - Handler.__init__(handler, connection) - self._handler = handler - self.client.set_handler(handler) - self._handler_task = asyncio.create_task(handler.run()) - try: - await handler.call_action(CommonAction.PING, {}) - self.ap.logger.info('Connected to Box runtime via WebSocket.') - connected.set() - await self._handler_task - except Exception as exc: - if not connected.is_set(): - connect_error.append(exc) - connected.set() - async def on_connect_failed(ctrl, exc): connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed')) connected.set() ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed) - self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback)) + self._ctrl_task = asyncio.create_task( + ctrl.run(self._make_connection_callback('WebSocket', connected, connect_error)) + ) try: await asyncio.wait_for(connected.wait(), timeout=30.0) @@ -139,29 +136,8 @@ class BoxRuntimeConnector: self.ap.logger.info('Terminating managed box runtime process...') self._subprocess.terminate() - if self._subprocess_wait_task is not None: - self._subprocess_wait_task.cancel() - self._subprocess_wait_task = None - def _load_configured_runtime_url(self) -> str: return str(get_box_config(self.ap).get('runtime_url', '')).strip() def _should_manage_local_runtime(self) -> bool: return not self.configured_runtime_url and platform.get_platform() != 'docker' - - def _get_ws_relay_port(self) -> int: - """Extract the port for ws relay from ws_relay_base_url.""" - from urllib.parse import urlparse - parsed = urlparse(self.ws_relay_base_url) - return parsed.port or 5410 - - def _get_rpc_ws_url(self) -> str: - """Derive the action RPC ws URL from the configured runtime URL. - - The RPC endpoint is on port+1 relative to the ws relay port. - """ - from urllib.parse import urlparse - parsed = urlparse(self.ws_relay_base_url) - host = parsed.hostname or '127.0.0.1' - port = (parsed.port or 5410) + 1 - return f'ws://{host}:{port}' diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py index 4346f7a1..52164d44 100644 --- a/src/langbot/pkg/box/runtime.py +++ b/src/langbot/pkg/box/runtime.py @@ -37,6 +37,7 @@ class _ManagedProcess: started_at: dt.datetime attach_lock: asyncio.Lock stderr_chunks: collections.deque[str] + stderr_total_len: int = 0 exit_code: int | None = None exited_at: dt.datetime | None = None @@ -306,10 +307,10 @@ class BoxRuntime: if not text: continue managed_process.stderr_chunks.append(text) - preview = '\n'.join(managed_process.stderr_chunks) - while len(preview) > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT and managed_process.stderr_chunks: - managed_process.stderr_chunks.popleft() - preview = '\n'.join(managed_process.stderr_chunks) + managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator + while managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT and managed_process.stderr_chunks: + removed = managed_process.stderr_chunks.popleft() + managed_process.stderr_total_len -= len(removed) + 1 self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}') except Exception as exc: self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}') @@ -378,18 +379,4 @@ class BoxRuntime: @staticmethod def _session_to_dict(info: BoxSessionInfo) -> dict: - return { - 'session_id': info.session_id, - 'backend_name': info.backend_name, - 'backend_session_id': info.backend_session_id, - 'image': info.image, - 'network': info.network.value, - 'host_path': info.host_path, - 'host_path_mode': info.host_path_mode.value, - 'cpus': info.cpus, - 'memory_mb': info.memory_mb, - 'pids_limit': info.pids_limit, - 'read_only_rootfs': info.read_only_rootfs, - 'created_at': info.created_at.isoformat(), - 'last_used_at': info.last_used_at.isoformat(), - } + return info.model_dump(mode='json') diff --git a/src/langbot/pkg/box/security.py b/src/langbot/pkg/box/security.py index 5627510a..1c05a039 100644 --- a/src/langbot/pkg/box/security.py +++ b/src/langbot/pkg/box/security.py @@ -20,13 +20,6 @@ BLOCKED_HOST_PATHS = frozenset({ '/var/run/podman', }) -RESERVED_CONTAINER_PATHS = frozenset({ - '/workspace', - '/tmp', - '/var/tmp', - '/run', -}) - def validate_sandbox_security(spec: BoxSpec) -> None: """Validate that a BoxSpec does not request dangerous container config. diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py index c056695f..4af6de6d 100644 --- a/src/langbot/pkg/box/server.py +++ b/src/langbot/pkg/box/server.py @@ -26,7 +26,6 @@ from langbot_plugin.runtime.io.handler import Handler from .actions import LangBotToBoxAction from .errors import ( - BoxError, BoxManagedProcessConflictError, BoxManagedProcessNotFoundError, BoxSessionNotFoundError, @@ -38,15 +37,7 @@ logger = logging.getLogger('langbot.box.server') def _result_to_dict(result: BoxExecutionResult) -> dict: - return { - 'session_id': result.session_id, - 'backend_name': result.backend_name, - 'status': result.status.value, - 'exit_code': result.exit_code, - 'stdout': result.stdout, - 'stderr': result.stderr, - 'duration_ms': result.duration_ms, - } + return result.model_dump(mode='json') class BoxServerHandler(Handler): diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 32c87292..bb8d7dbc 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -26,6 +26,11 @@ _INT_ADAPTER = pydantic.TypeAdapter(int) _UTC = _dt.timezone.utc _MAX_RECENT_ERRORS = 50 + +def _is_path_under(path: str, root: str) -> bool: + """Check whether *path* equals *root* or is a child of *root*.""" + return path == root or path.startswith(f'{root}{os.sep}') + if TYPE_CHECKING: from ..core import app as core_app import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -274,7 +279,7 @@ class BoxService: ) for allowed_root in self.allowed_host_mount_roots: - if self.default_host_workspace == allowed_root or self.default_host_workspace.startswith(f'{allowed_root}{os.sep}'): + if _is_path_under(self.default_host_workspace, allowed_root): os.makedirs(self.default_host_workspace, exist_ok=True) return @@ -293,7 +298,7 @@ class BoxService: raise BoxValidationError('host_path mounting is disabled because no allowed_host_mount_roots are configured') for allowed_root in self.allowed_host_mount_roots: - if host_path == allowed_root or host_path.startswith(f'{allowed_root}{os.sep}'): + if _is_path_under(host_path, allowed_root): return allowed_roots = ', '.join(self.allowed_host_mount_roots) diff --git a/src/langbot/pkg/pipeline/process/logging_utils.py b/src/langbot/pkg/pipeline/process/logging_utils.py index 78a289e8..9240e69d 100644 --- a/src/langbot/pkg/pipeline/process/logging_utils.py +++ b/src/langbot/pkg/pipeline/process/logging_utils.py @@ -25,24 +25,25 @@ def format_result_log( if content.startswith('err:'): return f'tool error: {cut_str(content)}' - try: - payload = json.loads(content) - except json.JSONDecodeError: - return cut_str(result.readable_str()) + if content.startswith('{'): + try: + payload = json.loads(content) + except json.JSONDecodeError: + return cut_str(result.readable_str()) - if isinstance(payload, dict): - status = payload.get('status', 'unknown') - exit_code = payload.get('exit_code') - backend = payload.get('backend', '') - stdout = str(payload.get('stdout', '')).strip() - summary = f'tool result: status={status}' - if exit_code is not None: - summary += f' exit_code={exit_code}' - if backend: - summary += f' backend={backend}' - if stdout: - summary += f' stdout={cut_str(stdout)}' - return summary + if isinstance(payload, dict): + status = payload.get('status', 'unknown') + exit_code = payload.get('exit_code') + backend = payload.get('backend', '') + stdout = str(payload.get('stdout', '')).strip() + summary = f'tool result: status={status}' + if exit_code is not None: + summary += f' exit_code={exit_code}' + if backend: + summary += f' backend={backend}' + if stdout: + summary += f' stdout={cut_str(stdout)}' + return summary return cut_str(result.readable_str()) diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index fe9e1d3a..0c45bd82 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -5,6 +5,7 @@ import copy import typing from .. import runner from ..modelmgr import requester as modelmgr_requester +from ..tools.loaders.native import SANDBOX_EXEC_TOOL_NAME import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.rag.context as rag_context @@ -24,7 +25,6 @@ Respond in the same language as the user's input. """ -SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' SANDBOX_EXEC_SYSTEM_GUIDANCE = ( 'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, ' 'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, ' @@ -43,13 +43,19 @@ SANDBOX_EXEC_WORKSPACE_GUIDANCE = ( class LocalAgentRunner(runner.RequestRunner): """Local agent request runner""" + _cached_sandbox_guidance: str | None = None + def _build_sandbox_system_guidance(self) -> str: + if self._cached_sandbox_guidance is not None: + return self._cached_sandbox_guidance + + from langbot.pkg.box.models import get_box_config + guidance = SANDBOX_EXEC_SYSTEM_GUIDANCE - default_host_workspace = str( - getattr(getattr(self.ap, 'instance_config', None), 'data', {}).get('box', {}).get('default_host_workspace', '') - ).strip() + default_host_workspace = str(get_box_config(self.ap).get('default_host_workspace', '')).strip() if default_host_workspace: guidance = f'{guidance} {SANDBOX_EXEC_WORKSPACE_GUIDANCE}' + self._cached_sandbox_guidance = guidance return guidance def _build_request_messages( diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index d0390257..88e76323 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -150,7 +150,7 @@ class RuntimeMCPSession: session_payload, skip_host_mount_validation=True, ) - except Exception as e: + except Exception: self.error_phase = MCPSessionErrorPhase.SESSION_CREATE raise @@ -169,7 +169,7 @@ class RuntimeMCPSession: result = await box_service.client.execute( box_service.build_spec(exec_payload, skip_host_mount_validation=True) ) - except Exception as e: + except Exception: self.error_phase = MCPSessionErrorPhase.DEP_INSTALL raise if not result.ok: @@ -186,7 +186,7 @@ class RuntimeMCPSession: session_id, self._build_box_process_payload(host_path), ) - except Exception as e: + except Exception: self.error_phase = MCPSessionErrorPhase.PROCESS_START raise @@ -196,14 +196,14 @@ class RuntimeMCPSession: transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) read_stream, write_stream = transport self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) - except Exception as e: + except Exception: self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT raise # Phase: MCP protocol initialization try: await self.session.initialize() - except Exception as e: + except Exception: self.error_phase = MCPSessionErrorPhase.MCP_INIT raise @@ -813,12 +813,13 @@ class MCPLoader(loader.ToolLoader): """获取所有服务器的信息""" info = {} for server_name, session in self.sessions.items(): + tools = session.get_tools() info[server_name] = { 'name': server_name, 'mode': session.server_config.get('mode'), 'enable': session.enable, - 'tools_count': len(session.get_tools()), - 'tool_names': [f.name for f in session.get_tools()], + 'tools_count': len(tools), + 'tool_names': [f.name for f in tools], } return info diff --git a/src/langbot/pkg/provider/tools/loaders/native.py b/src/langbot/pkg/provider/tools/loaders/native.py index 22e696d9..d13533e4 100644 --- a/src/langbot/pkg/provider/tools/loaders/native.py +++ b/src/langbot/pkg/provider/tools/loaders/native.py @@ -5,20 +5,28 @@ import json import langbot_plugin.api.entities.builtin.resource.tool as resource_tool from langbot_plugin.api.entities.events import pipeline_query +from langbot.pkg.box.models import BoxNetworkMode from .. import loader +SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' + class NativeToolLoader(loader.ToolLoader): - SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' + + def __init__(self, ap): + super().__init__(ap) + self._sandbox_exec_tool: resource_tool.LLMTool | None = None async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]: - return [self._build_sandbox_exec_tool()] + if self._sandbox_exec_tool is None: + self._sandbox_exec_tool = self._build_sandbox_exec_tool() + return [self._sandbox_exec_tool] async def has_tool(self, name: str) -> bool: - return name == self.SANDBOX_EXEC_TOOL_NAME + return name == SANDBOX_EXEC_TOOL_NAME async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query): - if name != self.SANDBOX_EXEC_TOOL_NAME: + if name != SANDBOX_EXEC_TOOL_NAME: raise ValueError(f'未找到工具: {name}') self.ap.logger.info( 'sandbox_exec tool invoked: ' @@ -32,7 +40,7 @@ class NativeToolLoader(loader.ToolLoader): def _build_sandbox_exec_tool(self) -> resource_tool.LLMTool: return resource_tool.LLMTool( - name=self.SANDBOX_EXEC_TOOL_NAME, + name=SANDBOX_EXEC_TOOL_NAME, human_desc='Execute a command inside the LangBot Box sandbox', description=( 'Run shell commands only inside the isolated LangBot Box sandbox. ' @@ -60,7 +68,7 @@ class NativeToolLoader(loader.ToolLoader): 'network': { 'type': 'string', 'description': 'Network policy for the sandbox session. Prefer off unless network is required.', - 'enum': ['off', 'on'], + 'enum': [e.value for e in BoxNetworkMode], 'default': 'off', }, 'env': {