refactor(box): clean up sandbox subsystem code quality and efficiency

- Fix O(n²) stderr trimming in runtime.py with running length tracker
  - Remove dead code: RESERVED_CONTAINER_PATHS, _subprocess_wait_task,
    unused config_hash computation, unused imports
  - Deduplicate connection callback in BoxRuntimeConnector, parse URL once
  - Use enum comparison instead of stringly-typed spec.network.value check
  - Replace manual _result_to_dict/_session_to_dict with model_dump()
  - Cache NativeToolLoader tool definition and sandbox system guidance
  - Extract _is_path_under() helper to eliminate duplicated path checks
  - Import SANDBOX_EXEC_TOOL_NAME from native.py instead of redefining
  - Add JSON startswith guard in logging_utils to skip futile json.loads
  - Fix ruff lint errors (F401 unused imports, F841 unused variables)
This commit is contained in:
youhuanghe
2026-03-22 02:28:25 +00:00
committed by WangCham
parent fbe6e145ec
commit 76fbd08680
10 changed files with 101 additions and 149 deletions

View File

@@ -4,17 +4,14 @@ import abc
import asyncio
import dataclasses
import datetime as dt
import hashlib
import json
import logging
import re
import shlex
import shutil
import typing
import uuid
from .errors import BoxError
from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, BoxSessionInfo, BoxSpec
from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, BoxNetworkMode, BoxSessionInfo, BoxSpec
from .security import validate_sandbox_security
# Hard cap on raw subprocess output to prevent unbounded memory usage.
@@ -102,20 +99,7 @@ class CLISandboxBackend(BaseSandboxBackend):
f'langbot.box.instance_id={self.instance_id}',
]
# Config hash label for identifying configuration drift
config_hash = hashlib.sha256(json.dumps({
'image': spec.image,
'network': spec.network.value,
'host_path': spec.host_path,
'host_path_mode': spec.host_path_mode.value,
'cpus': spec.cpus,
'memory_mb': spec.memory_mb,
'pids_limit': spec.pids_limit,
'read_only_rootfs': spec.read_only_rootfs,
}, sort_keys=True).encode()).hexdigest()[:16]
args.extend(['--label', f'langbot.box.config_hash={config_hash}'])
if spec.network.value == 'off':
if spec.network == BoxNetworkMode.OFF:
args.extend(['--network', 'none'])
# Resource limits
@@ -353,7 +337,7 @@ class CLISandboxBackend(BaseSandboxBackend):
@staticmethod
async def _read_stream(
stream: typing.Optional[asyncio.StreamReader],
stream: asyncio.StreamReader | None,
limit: int = _MAX_RAW_OUTPUT_BYTES,
) -> tuple[bytes, int]:
if stream is None:

View File

@@ -4,6 +4,7 @@ import asyncio
import os
import sys
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from langbot_plugin.entities.io.actions.enums import CommonAction
from langbot_plugin.runtime.io.handler import Handler
@@ -32,7 +33,11 @@ class BoxRuntimeConnector:
self._handler_task: asyncio.Task | None = None
self._ctrl_task: asyncio.Task | None = None
self._subprocess: asyncio.subprocess.Process | None = None
self._subprocess_wait_task: asyncio.Task | None = None
# Parse the relay URL once for reuse
parsed = urlparse(self.ws_relay_base_url)
self._relay_host = parsed.hostname or '127.0.0.1'
self._relay_port = parsed.port or 5410
async def initialize(self) -> None:
if self.manages_local_runtime:
@@ -40,6 +45,26 @@ class BoxRuntimeConnector:
else:
await self._connect_remote_ws()
def _make_connection_callback(
self, transport_name: str, connected: asyncio.Event, connect_error: list[Exception],
):
async def new_connection_callback(connection: Connection) -> None:
handler = Handler(connection)
self._handler = handler
self.client.set_handler(handler)
self._handler_task = asyncio.create_task(handler.run())
try:
await handler.call_action(CommonAction.PING, {})
self.ap.logger.info(f'Connected to Box runtime via {transport_name}.')
connected.set()
await self._handler_task
except Exception as exc:
if not connected.is_set():
connect_error.append(exc)
connected.set()
return new_connection_callback
async def _start_local_stdio(self) -> None:
"""Launch box server as subprocess and connect via stdio."""
from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController
@@ -50,29 +75,15 @@ class BoxRuntimeConnector:
connected = asyncio.Event()
connect_error: list[Exception] = []
async def new_connection_callback(connection: Connection) -> None:
handler = Handler.__new__(Handler)
Handler.__init__(handler, connection)
self._handler = handler
self.client.set_handler(handler)
self._handler_task = asyncio.create_task(handler.run())
try:
await handler.call_action(CommonAction.PING, {})
self.ap.logger.info('Connected to Box runtime via stdio.')
connected.set()
await self._handler_task
except Exception as exc:
if not connected.is_set():
connect_error.append(exc)
connected.set()
ctrl = StdioClientController(
command=python_path,
args=['-m', 'langbot.pkg.box.server', '--port', str(self._get_ws_relay_port())],
args=['-m', 'langbot.pkg.box.server', '--port', str(self._relay_port)],
env=env,
)
self._subprocess = None # StdioClientController manages the subprocess
self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback))
self._ctrl_task = asyncio.create_task(
ctrl.run(self._make_connection_callback('stdio', connected, connect_error))
)
# Wait for connection or failure
try:
@@ -90,33 +101,19 @@ class BoxRuntimeConnector:
"""Connect to a remote box server via WebSocket."""
from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController
ws_url = self._get_rpc_ws_url()
ws_url = f'ws://{self._relay_host}:{self._relay_port + 1}'
connected = asyncio.Event()
connect_error: list[Exception] = []
async def new_connection_callback(connection: Connection) -> None:
handler = Handler.__new__(Handler)
Handler.__init__(handler, connection)
self._handler = handler
self.client.set_handler(handler)
self._handler_task = asyncio.create_task(handler.run())
try:
await handler.call_action(CommonAction.PING, {})
self.ap.logger.info('Connected to Box runtime via WebSocket.')
connected.set()
await self._handler_task
except Exception as exc:
if not connected.is_set():
connect_error.append(exc)
connected.set()
async def on_connect_failed(ctrl, exc):
connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed'))
connected.set()
ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed)
self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback))
self._ctrl_task = asyncio.create_task(
ctrl.run(self._make_connection_callback('WebSocket', connected, connect_error))
)
try:
await asyncio.wait_for(connected.wait(), timeout=30.0)
@@ -139,29 +136,8 @@ class BoxRuntimeConnector:
self.ap.logger.info('Terminating managed box runtime process...')
self._subprocess.terminate()
if self._subprocess_wait_task is not None:
self._subprocess_wait_task.cancel()
self._subprocess_wait_task = None
def _load_configured_runtime_url(self) -> str:
return str(get_box_config(self.ap).get('runtime_url', '')).strip()
def _should_manage_local_runtime(self) -> bool:
return not self.configured_runtime_url and platform.get_platform() != 'docker'
def _get_ws_relay_port(self) -> int:
"""Extract the port for ws relay from ws_relay_base_url."""
from urllib.parse import urlparse
parsed = urlparse(self.ws_relay_base_url)
return parsed.port or 5410
def _get_rpc_ws_url(self) -> str:
"""Derive the action RPC ws URL from the configured runtime URL.
The RPC endpoint is on port+1 relative to the ws relay port.
"""
from urllib.parse import urlparse
parsed = urlparse(self.ws_relay_base_url)
host = parsed.hostname or '127.0.0.1'
port = (parsed.port or 5410) + 1
return f'ws://{host}:{port}'

View File

@@ -37,6 +37,7 @@ class _ManagedProcess:
started_at: dt.datetime
attach_lock: asyncio.Lock
stderr_chunks: collections.deque[str]
stderr_total_len: int = 0
exit_code: int | None = None
exited_at: dt.datetime | None = None
@@ -306,10 +307,10 @@ class BoxRuntime:
if not text:
continue
managed_process.stderr_chunks.append(text)
preview = '\n'.join(managed_process.stderr_chunks)
while len(preview) > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT and managed_process.stderr_chunks:
managed_process.stderr_chunks.popleft()
preview = '\n'.join(managed_process.stderr_chunks)
managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator
while managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT and managed_process.stderr_chunks:
removed = managed_process.stderr_chunks.popleft()
managed_process.stderr_total_len -= len(removed) + 1
self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}')
except Exception as exc:
self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}')
@@ -378,18 +379,4 @@ class BoxRuntime:
@staticmethod
def _session_to_dict(info: BoxSessionInfo) -> dict:
return {
'session_id': info.session_id,
'backend_name': info.backend_name,
'backend_session_id': info.backend_session_id,
'image': info.image,
'network': info.network.value,
'host_path': info.host_path,
'host_path_mode': info.host_path_mode.value,
'cpus': info.cpus,
'memory_mb': info.memory_mb,
'pids_limit': info.pids_limit,
'read_only_rootfs': info.read_only_rootfs,
'created_at': info.created_at.isoformat(),
'last_used_at': info.last_used_at.isoformat(),
}
return info.model_dump(mode='json')

View File

@@ -20,13 +20,6 @@ BLOCKED_HOST_PATHS = frozenset({
'/var/run/podman',
})
RESERVED_CONTAINER_PATHS = frozenset({
'/workspace',
'/tmp',
'/var/tmp',
'/run',
})
def validate_sandbox_security(spec: BoxSpec) -> None:
"""Validate that a BoxSpec does not request dangerous container config.

View File

@@ -26,7 +26,6 @@ from langbot_plugin.runtime.io.handler import Handler
from .actions import LangBotToBoxAction
from .errors import (
BoxError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionNotFoundError,
@@ -38,15 +37,7 @@ logger = logging.getLogger('langbot.box.server')
def _result_to_dict(result: BoxExecutionResult) -> dict:
return {
'session_id': result.session_id,
'backend_name': result.backend_name,
'status': result.status.value,
'exit_code': result.exit_code,
'stdout': result.stdout,
'stderr': result.stderr,
'duration_ms': result.duration_ms,
}
return result.model_dump(mode='json')
class BoxServerHandler(Handler):

View File

@@ -26,6 +26,11 @@ _INT_ADAPTER = pydantic.TypeAdapter(int)
_UTC = _dt.timezone.utc
_MAX_RECENT_ERRORS = 50
def _is_path_under(path: str, root: str) -> bool:
"""Check whether *path* equals *root* or is a child of *root*."""
return path == root or path.startswith(f'{root}{os.sep}')
if TYPE_CHECKING:
from ..core import app as core_app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -274,7 +279,7 @@ class BoxService:
)
for allowed_root in self.allowed_host_mount_roots:
if self.default_host_workspace == allowed_root or self.default_host_workspace.startswith(f'{allowed_root}{os.sep}'):
if _is_path_under(self.default_host_workspace, allowed_root):
os.makedirs(self.default_host_workspace, exist_ok=True)
return
@@ -293,7 +298,7 @@ class BoxService:
raise BoxValidationError('host_path mounting is disabled because no allowed_host_mount_roots are configured')
for allowed_root in self.allowed_host_mount_roots:
if host_path == allowed_root or host_path.startswith(f'{allowed_root}{os.sep}'):
if _is_path_under(host_path, allowed_root):
return
allowed_roots = ', '.join(self.allowed_host_mount_roots)

View File

@@ -25,6 +25,7 @@ def format_result_log(
if content.startswith('err:'):
return f'tool error: {cut_str(content)}'
if content.startswith('{'):
try:
payload = json.loads(content)
except json.JSONDecodeError:

View File

@@ -5,6 +5,7 @@ import copy
import typing
from .. import runner
from ..modelmgr import requester as modelmgr_requester
from ..tools.loaders.native import SANDBOX_EXEC_TOOL_NAME
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.rag.context as rag_context
@@ -24,7 +25,6 @@ Respond in the same language as the user's input.
</user_message>
"""
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
SANDBOX_EXEC_SYSTEM_GUIDANCE = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
@@ -43,13 +43,19 @@ SANDBOX_EXEC_WORKSPACE_GUIDANCE = (
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
_cached_sandbox_guidance: str | None = None
def _build_sandbox_system_guidance(self) -> str:
if self._cached_sandbox_guidance is not None:
return self._cached_sandbox_guidance
from langbot.pkg.box.models import get_box_config
guidance = SANDBOX_EXEC_SYSTEM_GUIDANCE
default_host_workspace = str(
getattr(getattr(self.ap, 'instance_config', None), 'data', {}).get('box', {}).get('default_host_workspace', '')
).strip()
default_host_workspace = str(get_box_config(self.ap).get('default_host_workspace', '')).strip()
if default_host_workspace:
guidance = f'{guidance} {SANDBOX_EXEC_WORKSPACE_GUIDANCE}'
self._cached_sandbox_guidance = guidance
return guidance
def _build_request_messages(

View File

@@ -150,7 +150,7 @@ class RuntimeMCPSession:
session_payload,
skip_host_mount_validation=True,
)
except Exception as e:
except Exception:
self.error_phase = MCPSessionErrorPhase.SESSION_CREATE
raise
@@ -169,7 +169,7 @@ class RuntimeMCPSession:
result = await box_service.client.execute(
box_service.build_spec(exec_payload, skip_host_mount_validation=True)
)
except Exception as e:
except Exception:
self.error_phase = MCPSessionErrorPhase.DEP_INSTALL
raise
if not result.ok:
@@ -186,7 +186,7 @@ class RuntimeMCPSession:
session_id,
self._build_box_process_payload(host_path),
)
except Exception as e:
except Exception:
self.error_phase = MCPSessionErrorPhase.PROCESS_START
raise
@@ -196,14 +196,14 @@ class RuntimeMCPSession:
transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url))
read_stream, write_stream = transport
self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
except Exception as e:
except Exception:
self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT
raise
# Phase: MCP protocol initialization
try:
await self.session.initialize()
except Exception as e:
except Exception:
self.error_phase = MCPSessionErrorPhase.MCP_INIT
raise
@@ -813,12 +813,13 @@ class MCPLoader(loader.ToolLoader):
"""获取所有服务器的信息"""
info = {}
for server_name, session in self.sessions.items():
tools = session.get_tools()
info[server_name] = {
'name': server_name,
'mode': session.server_config.get('mode'),
'enable': session.enable,
'tools_count': len(session.get_tools()),
'tool_names': [f.name for f in session.get_tools()],
'tools_count': len(tools),
'tool_names': [f.name for f in tools],
}
return info

View File

@@ -5,20 +5,28 @@ import json
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from langbot.pkg.box.models import BoxNetworkMode
from .. import loader
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
class NativeToolLoader(loader.ToolLoader):
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
def __init__(self, ap):
super().__init__(ap)
self._sandbox_exec_tool: resource_tool.LLMTool | None = None
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
return [self._build_sandbox_exec_tool()]
if self._sandbox_exec_tool is None:
self._sandbox_exec_tool = self._build_sandbox_exec_tool()
return [self._sandbox_exec_tool]
async def has_tool(self, name: str) -> bool:
return name == self.SANDBOX_EXEC_TOOL_NAME
return name == SANDBOX_EXEC_TOOL_NAME
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query):
if name != self.SANDBOX_EXEC_TOOL_NAME:
if name != SANDBOX_EXEC_TOOL_NAME:
raise ValueError(f'未找到工具: {name}')
self.ap.logger.info(
'sandbox_exec tool invoked: '
@@ -32,7 +40,7 @@ class NativeToolLoader(loader.ToolLoader):
def _build_sandbox_exec_tool(self) -> resource_tool.LLMTool:
return resource_tool.LLMTool(
name=self.SANDBOX_EXEC_TOOL_NAME,
name=SANDBOX_EXEC_TOOL_NAME,
human_desc='Execute a command inside the LangBot Box sandbox',
description=(
'Run shell commands only inside the isolated LangBot Box sandbox. '
@@ -60,7 +68,7 @@ class NativeToolLoader(loader.ToolLoader):
'network': {
'type': 'string',
'description': 'Network policy for the sandbox session. Prefer off unless network is required.',
'enum': ['off', 'on'],
'enum': [e.value for e in BoxNetworkMode],
'default': 'off',
},
'env': {