From b64a23f9aca6ff6316975c4aad464eb8e16347e7 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 07:24:47 +0000 Subject: [PATCH] refactor(box): move box runtime to langbot-plugin-sdk Extract self-contained box runtime modules (actions, backend, client, errors, models, runtime, security, server) to langbot-plugin-sdk and update all imports to use `langbot_plugin.box.*`. Keep only service and connector in LangBot core as they depend on the Application context. - Update docker-compose to use `langbot_plugin.box.server` entry point - Update pyproject.toml to use local SDK via `tool.uv.sources` - Remove migrated source files and their unit/integration tests - Update remaining test imports to match new module paths --- docker/docker-compose.yaml | 2 +- src/langbot/pkg/box/actions.py | 21 - src/langbot/pkg/box/backend.py | 388 ------------------ src/langbot/pkg/box/client.py | 193 --------- src/langbot/pkg/box/connector.py | 27 +- src/langbot/pkg/box/errors.py | 33 -- src/langbot/pkg/box/models.py | 274 ------------- src/langbot/pkg/box/runtime.py | 386 ----------------- src/langbot/pkg/box/security.py | 35 -- src/langbot/pkg/box/server.py | 267 ------------ src/langbot/pkg/box/service.py | 15 +- src/langbot/pkg/provider/tools/loaders/mcp.py | 2 +- .../pkg/provider/tools/loaders/native.py | 2 +- tests/unit_tests/box/test_backend_clip.py | 38 -- tests/unit_tests/box/test_box_connector.py | 5 +- .../box/test_box_managed_process.py | 103 ----- tests/unit_tests/box/test_box_security.py | 59 --- tests/unit_tests/box/test_box_service.py | 12 +- .../provider/test_mcp_box_integration.py | 4 +- 19 files changed, 42 insertions(+), 1824 deletions(-) delete mode 100644 src/langbot/pkg/box/actions.py delete mode 100644 src/langbot/pkg/box/backend.py delete mode 100644 src/langbot/pkg/box/client.py delete mode 100644 src/langbot/pkg/box/errors.py delete mode 100644 src/langbot/pkg/box/models.py delete mode 100644 src/langbot/pkg/box/runtime.py delete mode 100644 src/langbot/pkg/box/security.py delete mode 100644 src/langbot/pkg/box/server.py delete mode 100644 tests/unit_tests/box/test_backend_clip.py delete mode 100644 tests/unit_tests/box/test_box_managed_process.py delete mode 100644 tests/unit_tests/box/test_box_security.py diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index cf44671e..85e6e455 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -7,7 +7,7 @@ services: langbot_box_runtime: image: rockchin/langbot:latest container_name: langbot_box_runtime - command: ["uv", "run", "--no-sync", "-m", "langbot.pkg.box.server"] + command: ["uv", "run", "--no-sync", "-m", "langbot_plugin.box.server"] volumes: # Mount the container runtime socket from the host. # Uncomment the one that matches your container runtime: diff --git a/src/langbot/pkg/box/actions.py b/src/langbot/pkg/box/actions.py deleted file mode 100644 index 954c606c..00000000 --- a/src/langbot/pkg/box/actions.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Box-specific action types for the action RPC protocol.""" - -from __future__ import annotations - -from langbot_plugin.entities.io.actions.enums import ActionType - - -class LangBotToBoxAction(ActionType): - """Actions sent from LangBot to the Box runtime.""" - - HEALTH = 'box_health' - STATUS = 'box_status' - EXEC = 'box_exec' - CREATE_SESSION = 'box_create_session' - GET_SESSION = 'box_get_session' - GET_SESSIONS = 'box_get_sessions' - DELETE_SESSION = 'box_delete_session' - START_MANAGED_PROCESS = 'box_start_managed_process' - GET_MANAGED_PROCESS = 'box_get_managed_process' - GET_BACKEND_INFO = 'box_get_backend_info' - SHUTDOWN = 'box_shutdown' diff --git a/src/langbot/pkg/box/backend.py b/src/langbot/pkg/box/backend.py deleted file mode 100644 index e5bbe564..00000000 --- a/src/langbot/pkg/box/backend.py +++ /dev/null @@ -1,388 +0,0 @@ -from __future__ import annotations - -import abc -import asyncio -import dataclasses -import datetime as dt -import logging -import re -import shlex -import shutil -import uuid - -from .errors import BoxError -from .models import ( - DEFAULT_BOX_MOUNT_PATH, - BoxExecutionResult, - BoxExecutionStatus, - BoxHostMountMode, - BoxNetworkMode, - BoxSessionInfo, - BoxSpec, -) -from .security import validate_sandbox_security - -# Hard cap on raw subprocess output to prevent unbounded memory usage. -# Container timeout already bounds duration, but fast commands can still -# produce large output within the time limit. After this many bytes the -# remaining output is discarded before decoding. -_MAX_RAW_OUTPUT_BYTES = 1_048_576 # 1 MB per stream - - -@dataclasses.dataclass(slots=True) -class _CommandResult: - return_code: int - stdout: str - stderr: str - timed_out: bool = False - - -class BaseSandboxBackend(abc.ABC): - name: str - instance_id: str = '' - - def __init__(self, logger: logging.Logger): - self.logger = logger - - async def initialize(self): - return None - - @abc.abstractmethod - async def is_available(self) -> bool: - pass - - @abc.abstractmethod - async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: - pass - - @abc.abstractmethod - async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: - pass - - @abc.abstractmethod - async def stop_session(self, session: BoxSessionInfo): - pass - - async def start_managed_process(self, session: BoxSessionInfo, spec): - raise BoxError(f'{self.name} backend does not support managed processes') - - async def cleanup_orphaned_containers(self, current_instance_id: str = ''): - """Remove lingering containers from previous runs. No-op by default.""" - pass - - -class CLISandboxBackend(BaseSandboxBackend): - command: str - - def __init__(self, logger: logging.Logger, command: str, backend_name: str): - super().__init__(logger) - self.command = command - self.name = backend_name - - async def is_available(self) -> bool: - if shutil.which(self.command) is None: - return False - - result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False) - return result.return_code == 0 and not result.timed_out - - async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: - validate_sandbox_security(spec) - - now = dt.datetime.now(dt.UTC) - container_name = self._build_container_name(spec.session_id) - - args = [ - self.command, - 'run', - '-d', - '--rm', - '--name', - container_name, - '--label', - 'langbot.box=true', - '--label', - f'langbot.session_id={spec.session_id}', - '--label', - f'langbot.box.instance_id={self.instance_id}', - ] - - if spec.network == BoxNetworkMode.OFF: - args.extend(['--network', 'none']) - - # Resource limits - args.extend(['--cpus', str(spec.cpus)]) - args.extend(['--memory', f'{spec.memory_mb}m']) - args.extend(['--pids-limit', str(spec.pids_limit)]) - - if spec.read_only_rootfs: - args.append('--read-only') - args.extend(['--tmpfs', '/tmp:size=64m']) - - if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: - mount_spec = f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}:{spec.host_path_mode.value}' - args.extend(['-v', mount_spec]) - - args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) - - self.logger.info( - f'LangBot Box backend start_session: backend={self.name} ' - f'session_id={spec.session_id} container_name={container_name} ' - f'image={spec.image} network={spec.network.value} ' - f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} ' - f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' - f'read_only_rootfs={spec.read_only_rootfs}' - ) - - await self._run_command(args, timeout_sec=30, check=True) - - return BoxSessionInfo( - session_id=spec.session_id, - backend_name=self.name, - backend_session_id=container_name, - image=spec.image, - network=spec.network, - host_path=spec.host_path, - host_path_mode=spec.host_path_mode, - cpus=spec.cpus, - memory_mb=spec.memory_mb, - pids_limit=spec.pids_limit, - read_only_rootfs=spec.read_only_rootfs, - created_at=now, - last_used_at=now, - ) - - async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: - start = dt.datetime.now(dt.UTC) - args = [self.command, 'exec'] - - for key, value in spec.env.items(): - args.extend(['-e', f'{key}={value}']) - - args.extend( - [ - session.backend_session_id, - 'sh', - '-lc', - self._build_exec_command(spec.workdir, spec.cmd), - ] - ) - - cmd_preview = spec.cmd.strip() - if len(cmd_preview) > 400: - cmd_preview = f'{cmd_preview[:397]}...' - self.logger.info( - f'LangBot Box backend exec: backend={self.name} ' - f'session_id={session.session_id} container_name={session.backend_session_id} ' - f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} ' - f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}' - ) - - result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False) - duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) - - if result.timed_out: - return BoxExecutionResult( - session_id=session.session_id, - backend_name=self.name, - status=BoxExecutionStatus.TIMED_OUT, - exit_code=None, - stdout=result.stdout, - stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', - duration_ms=duration_ms, - ) - - return BoxExecutionResult( - session_id=session.session_id, - backend_name=self.name, - status=BoxExecutionStatus.COMPLETED, - exit_code=result.return_code, - stdout=result.stdout, - stderr=result.stderr, - duration_ms=duration_ms, - ) - - async def stop_session(self, session: BoxSessionInfo): - self.logger.info( - f'LangBot Box backend stop_session: backend={self.name} ' - f'session_id={session.session_id} container_name={session.backend_session_id}' - ) - await self._run_command( - [self.command, 'rm', '-f', session.backend_session_id], - timeout_sec=20, - check=False, - ) - - async def cleanup_orphaned_containers(self, current_instance_id: str = ''): - """Remove langbot.box containers from previous instances. - - Only removes containers whose ``langbot.box.instance_id`` label does - NOT match *current_instance_id*. Containers without the label (from - older versions) are also removed. - """ - result = await self._run_command( - [ - self.command, - 'ps', - '-a', - '--filter', - 'label=langbot.box=true', - '--format', - '{{.ID}}\t{{.Label "langbot.box.instance_id"}}', - ], - timeout_sec=10, - check=False, - ) - if result.return_code != 0 or not result.stdout.strip(): - return - orphan_ids = [] - for line in result.stdout.strip().split('\n'): - line = line.strip() - if not line: - continue - parts = line.split('\t', 1) - cid = parts[0].strip() - label_instance = parts[1].strip() if len(parts) > 1 else '' - if label_instance != current_instance_id: - orphan_ids.append(cid) - if not orphan_ids: - return - for cid in orphan_ids: - self.logger.info(f'Cleaning up orphaned Box container: {cid}') - await self._run_command( - [self.command, 'rm', '-f', *orphan_ids], - timeout_sec=30, - check=False, - ) - - async def start_managed_process(self, session: BoxSessionInfo, spec) -> asyncio.subprocess.Process: - args = [self.command, 'exec', '-i'] - - for key, value in spec.env.items(): - args.extend(['-e', f'{key}={value}']) - - args.extend( - [ - session.backend_session_id, - 'sh', - '-lc', - self._build_spawn_command(spec.cwd, spec.command, spec.args), - ] - ) - - self.logger.info( - f'LangBot Box backend start_managed_process: backend={self.name} ' - f'session_id={session.session_id} container_name={session.backend_session_id} ' - f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} command={spec.command} args={spec.args}' - ) - - return await asyncio.create_subprocess_exec( - *args, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - def _build_container_name(self, session_id: str) -> str: - normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session' - suffix = uuid.uuid4().hex[:8] - return f'langbot-box-{normalized[:32]}-{suffix}' - - def _build_exec_command(self, workdir: str, cmd: str) -> str: - quoted_workdir = shlex.quote(workdir) - return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}' - - def _build_spawn_command(self, cwd: str, command: str, args: list[str]) -> str: - quoted_cwd = shlex.quote(cwd) - command_parts = [shlex.quote(command), *[shlex.quote(arg) for arg in args]] - return f'mkdir -p {quoted_cwd} && cd {quoted_cwd} && exec {" ".join(command_parts)}' - - async def _run_command( - self, - args: list[str], - timeout_sec: int, - check: bool, - ) -> _CommandResult: - process = await asyncio.create_subprocess_exec( - *args, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout_task = asyncio.create_task(self._read_stream(process.stdout)) - stderr_task = asyncio.create_task(self._read_stream(process.stderr)) - - timed_out = False - try: - await asyncio.wait_for(process.wait(), timeout=timeout_sec) - except asyncio.TimeoutError: - process.kill() - timed_out = True - await process.wait() - - stdout_bytes, stdout_total = await stdout_task - stderr_bytes, stderr_total = await stderr_task - - if timed_out: - return _CommandResult( - return_code=-1, - stdout=self._clip_captured_bytes(stdout_bytes, stdout_total), - stderr=self._clip_captured_bytes(stderr_bytes, stderr_total), - timed_out=True, - ) - - stdout = self._clip_captured_bytes(stdout_bytes, stdout_total) - stderr = self._clip_captured_bytes(stderr_bytes, stderr_total) - - if check and process.returncode != 0: - raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error')) - - return _CommandResult( - return_code=process.returncode, - stdout=stdout, - stderr=stderr, - timed_out=False, - ) - - @staticmethod - def _clip_captured_bytes(data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES) -> str: - text = data.decode('utf-8', errors='replace').strip() - if total_size > limit: - text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]' - return text - - @staticmethod - async def _read_stream( - stream: asyncio.StreamReader | None, - limit: int = _MAX_RAW_OUTPUT_BYTES, - ) -> tuple[bytes, int]: - if stream is None: - return b'', 0 - - chunks = bytearray() - total_size = 0 - while True: - chunk = await stream.read(65536) - if not chunk: - break - total_size += len(chunk) - remaining = limit - len(chunks) - if remaining > 0: - chunks.extend(chunk[:remaining]) - - return bytes(chunks), total_size - - def _format_cli_error(self, message: str) -> str: - message = ' '.join(message.split()) - if len(message) > 300: - message = f'{message[:297]}...' - return f'{self.name} backend error: {message}' - - -class PodmanBackend(CLISandboxBackend): - def __init__(self, logger: logging.Logger): - super().__init__(logger=logger, command='podman', backend_name='podman') - - -class DockerBackend(CLISandboxBackend): - def __init__(self, logger: logging.Logger): - super().__init__(logger=logger, command='docker', backend_name='docker') diff --git a/src/langbot/pkg/box/client.py b/src/langbot/pkg/box/client.py deleted file mode 100644 index b2732b37..00000000 --- a/src/langbot/pkg/box/client.py +++ /dev/null @@ -1,193 +0,0 @@ -"""BoxRuntimeClient abstraction for Box Runtime access.""" - -from __future__ import annotations - -import abc -import logging -from typing import Any, TYPE_CHECKING - -from langbot_plugin.runtime.io.handler import Handler - -from .actions import LangBotToBoxAction -from .errors import BoxError, BoxRuntimeUnavailableError -from .models import ( - BoxExecutionResult, - BoxExecutionStatus, - BoxManagedProcessInfo, - BoxManagedProcessSpec, - BoxSpec, - get_box_config, -) -from ..utils import platform - -if TYPE_CHECKING: - from ..core import app as core_app - - -def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str: - """Derive the ws relay base URL used for managed-process attach.""" - runtime_url = str(get_box_config(ap).get('runtime_url', '')).strip() - if runtime_url: - return runtime_url - - if platform.get_platform() == 'docker': - return 'http://langbot_box_runtime:5410' - return 'http://127.0.0.1:5410' - - -class BoxRuntimeClient(abc.ABC): - """Abstract interface that BoxService uses to talk to a Box Runtime.""" - - @abc.abstractmethod - async def initialize(self) -> None: ... - - @abc.abstractmethod - async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ... - - @abc.abstractmethod - async def shutdown(self) -> None: ... - - @abc.abstractmethod - async def get_status(self) -> dict: ... - - @abc.abstractmethod - async def get_sessions(self) -> list[dict]: ... - - @abc.abstractmethod - async def get_backend_info(self) -> dict: ... - - @abc.abstractmethod - async def delete_session(self, session_id: str) -> None: ... - - @abc.abstractmethod - async def create_session(self, spec: BoxSpec) -> dict: ... - - @abc.abstractmethod - async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ... - - @abc.abstractmethod - async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ... - - @abc.abstractmethod - async def get_session(self, session_id: str) -> dict: ... - - -def _translate_action_error(exc: Exception) -> BoxError: - """Convert an ActionCallError message back into the appropriate BoxError subclass.""" - from .errors import ( - BoxBackendUnavailableError, - BoxManagedProcessConflictError, - BoxManagedProcessNotFoundError, - BoxSessionConflictError, - BoxSessionNotFoundError, - BoxValidationError, - ) - - msg = str(exc) - _ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [ - ('BoxValidationError:', BoxValidationError), - ('BoxSessionNotFoundError:', BoxSessionNotFoundError), - ('BoxSessionConflictError:', BoxSessionConflictError), - ('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError), - ('BoxManagedProcessConflictError:', BoxManagedProcessConflictError), - ('BoxBackendUnavailableError:', BoxBackendUnavailableError), - ] - for prefix, cls in _ERROR_PREFIX_MAP: - if prefix in msg: - return cls(msg) - return BoxError(msg) - - -class ActionRPCBoxClient(BoxRuntimeClient): - """Client that talks to BoxRuntime via the action RPC protocol.""" - - def __init__(self, logger: logging.Logger): - self._logger = logger - self._handler: Handler | None = None - - @property - def handler(self) -> Handler: - if self._handler is None: - raise BoxRuntimeUnavailableError('box runtime not connected') - return self._handler - - def set_handler(self, handler: Handler) -> None: - self._handler = handler - - async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]: - try: - return await self.handler.call_action(action, data, timeout=timeout) - except BoxRuntimeUnavailableError: - raise - except Exception as exc: - raise _translate_action_error(exc) from exc - - async def initialize(self) -> None: - try: - await self._call(LangBotToBoxAction.HEALTH, {}) - self._logger.info('LangBot Box runtime connected via action RPC.') - except Exception as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc - - async def execute(self, spec: BoxSpec) -> BoxExecutionResult: - data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0) - return BoxExecutionResult( - session_id=data['session_id'], - backend_name=data['backend_name'], - status=BoxExecutionStatus(data['status']), - exit_code=data.get('exit_code'), - stdout=data.get('stdout', ''), - stderr=data.get('stderr', ''), - duration_ms=data['duration_ms'], - ) - - async def shutdown(self) -> None: - if self._handler is not None: - try: - await self._call(LangBotToBoxAction.SHUTDOWN, {}) - except Exception: - pass - self._handler = None - - async def get_status(self) -> dict: - return await self._call(LangBotToBoxAction.STATUS, {}) - - async def get_sessions(self) -> list[dict]: - data = await self._call(LangBotToBoxAction.GET_SESSIONS, {}) - return data['sessions'] - - async def get_session(self, session_id: str) -> dict: - return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id}) - - async def get_backend_info(self) -> dict: - return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {}) - - async def delete_session(self, session_id: str) -> None: - await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}) - - async def create_session(self, spec: BoxSpec) -> dict: - return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json')) - - async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: - data = await self._call( - LangBotToBoxAction.START_MANAGED_PROCESS, - {'session_id': session_id, 'spec': spec.model_dump(mode='json')}, - ) - return BoxManagedProcessInfo.model_validate(data) - - async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: - data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id}) - return BoxManagedProcessInfo.model_validate(data) - - def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str: - base = ws_relay_base_url - if base.startswith('https://'): - scheme = 'wss://' - suffix = base[len('https://') :] - elif base.startswith('http://'): - scheme = 'ws://' - suffix = base[len('http://') :] - else: - scheme = 'ws://' - suffix = base - return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws' diff --git a/src/langbot/pkg/box/connector.py b/src/langbot/pkg/box/connector.py index 389f56c4..04cc697a 100644 --- a/src/langbot/pkg/box/connector.py +++ b/src/langbot/pkg/box/connector.py @@ -10,15 +10,32 @@ from langbot_plugin.entities.io.actions.enums import CommonAction from langbot_plugin.runtime.io.handler import Handler from langbot_plugin.runtime.io.connection import Connection -from .client import ActionRPCBoxClient, resolve_box_ws_relay_url -from .errors import BoxRuntimeUnavailableError -from .models import get_box_config +from langbot_plugin.box.client import ActionRPCBoxClient +from langbot_plugin.box.errors import BoxRuntimeUnavailableError from ..utils import platform if TYPE_CHECKING: from ..core import app as core_app +def _get_box_config(ap) -> dict: + """Return the 'box' section from instance config, with safe fallbacks.""" + instance_config = getattr(ap, 'instance_config', None) + config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {} + return config_data.get('box', {}) + + +def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str: + """Derive the ws relay base URL used for managed-process attach.""" + runtime_url = str(_get_box_config(ap).get('runtime_url', '')).strip() + if runtime_url: + return runtime_url + + if platform.get_platform() == 'docker': + return 'http://langbot_box_runtime:5410' + return 'http://127.0.0.1:5410' + + class BoxRuntimeConnector: """Connect to the Box runtime via action RPC (stdio or ws).""" @@ -80,7 +97,7 @@ class BoxRuntimeConnector: ctrl = StdioClientController( command=python_path, - args=['-m', 'langbot.pkg.box.server', '--port', str(self._relay_port)], + args=['-m', 'langbot_plugin.box.server', '--port', str(self._relay_port)], env=env, ) self._subprocess = None # StdioClientController manages the subprocess @@ -140,7 +157,7 @@ class BoxRuntimeConnector: self._subprocess.terminate() def _load_configured_runtime_url(self) -> str: - return str(get_box_config(self.ap).get('runtime_url', '')).strip() + return str(_get_box_config(self.ap).get('runtime_url', '')).strip() def _should_manage_local_runtime(self) -> bool: return not self.configured_runtime_url and platform.get_platform() != 'docker' diff --git a/src/langbot/pkg/box/errors.py b/src/langbot/pkg/box/errors.py deleted file mode 100644 index f6a8e864..00000000 --- a/src/langbot/pkg/box/errors.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import annotations - - -class BoxError(RuntimeError): - """Base error for LangBot Box failures.""" - - -class BoxValidationError(BoxError): - """Raised when sandbox_exec arguments are invalid.""" - - -class BoxBackendUnavailableError(BoxError): - """Raised when no supported container backend is available.""" - - -class BoxRuntimeUnavailableError(BoxError): - """Raised when the standalone Box Runtime service is unavailable.""" - - -class BoxSessionConflictError(BoxError): - """Raised when an existing session cannot satisfy a new request.""" - - -class BoxSessionNotFoundError(BoxError): - """Raised when a referenced session does not exist.""" - - -class BoxManagedProcessConflictError(BoxError): - """Raised when a session already has an active managed process.""" - - -class BoxManagedProcessNotFoundError(BoxError): - """Raised when a referenced managed process does not exist.""" diff --git a/src/langbot/pkg/box/models.py b/src/langbot/pkg/box/models.py deleted file mode 100644 index 3d1b2a16..00000000 --- a/src/langbot/pkg/box/models.py +++ /dev/null @@ -1,274 +0,0 @@ -from __future__ import annotations - -import datetime as dt -import enum - -import pydantic - - -DEFAULT_BOX_IMAGE = 'python:3.11-slim' -DEFAULT_BOX_MOUNT_PATH = '/workspace' - - -def get_box_config(ap) -> dict: - """Return the 'box' section from instance config, with safe fallbacks.""" - instance_config = getattr(ap, 'instance_config', None) - config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {} - return config_data.get('box', {}) - - -class BoxNetworkMode(str, enum.Enum): - OFF = 'off' - ON = 'on' - - -class BoxExecutionStatus(str, enum.Enum): - COMPLETED = 'completed' - TIMED_OUT = 'timed_out' - - -class BoxHostMountMode(str, enum.Enum): - NONE = 'none' - READ_ONLY = 'ro' - READ_WRITE = 'rw' - - -class BoxManagedProcessStatus(str, enum.Enum): - RUNNING = 'running' - EXITED = 'exited' - - -class BoxSpec(pydantic.BaseModel): - cmd: str = '' - workdir: str = '/workspace' - timeout_sec: int = 30 - network: BoxNetworkMode = BoxNetworkMode.OFF - session_id: str - env: dict[str, str] = pydantic.Field(default_factory=dict) - image: str = DEFAULT_BOX_IMAGE - host_path: str | None = None - host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE - # Resource limits - cpus: float = 1.0 - memory_mb: int = 512 - pids_limit: int = 128 - read_only_rootfs: bool = True - - @pydantic.field_validator('cmd') - @classmethod - def validate_cmd(cls, value: str) -> str: - return value.strip() - - @pydantic.field_validator('workdir') - @classmethod - def validate_workdir(cls, value: str) -> str: - value = value.strip() - if not value.startswith('/'): - raise ValueError('workdir must be an absolute path inside the sandbox') - return value - - @pydantic.field_validator('timeout_sec') - @classmethod - def validate_timeout_sec(cls, value: int) -> int: - if value <= 0: - raise ValueError('timeout_sec must be greater than 0') - return value - - @pydantic.field_validator('cpus') - @classmethod - def validate_cpus(cls, value: float) -> float: - if value <= 0: - raise ValueError('cpus must be greater than 0') - return value - - @pydantic.field_validator('memory_mb') - @classmethod - def validate_memory_mb(cls, value: int) -> int: - if value < 32: - raise ValueError('memory_mb must be at least 32') - return value - - @pydantic.field_validator('pids_limit') - @classmethod - def validate_pids_limit(cls, value: int) -> int: - if value < 1: - raise ValueError('pids_limit must be at least 1') - return value - - @pydantic.field_validator('session_id') - @classmethod - def validate_session_id(cls, value: str) -> str: - value = value.strip() - if not value: - raise ValueError('session_id must not be empty') - return value - - @pydantic.field_validator('env') - @classmethod - def validate_env(cls, value: dict[str, str]) -> dict[str, str]: - return {str(k): str(v) for k, v in value.items()} - - @pydantic.field_validator('host_path') - @classmethod - def validate_host_path(cls, value: str | None) -> str | None: - if value is None: - return None - value = value.strip() - if not value.startswith('/'): - raise ValueError('host_path must be an absolute host path') - return value - - @pydantic.model_validator(mode='after') - def validate_host_mount_consistency(self) -> 'BoxSpec': - if self.host_path is None: - return self - if self.host_path_mode == BoxHostMountMode.NONE: - return self - if not self.workdir.startswith(DEFAULT_BOX_MOUNT_PATH): - raise ValueError('workdir must stay under /workspace when host_path is provided') - return self - - -class BoxProfile(pydantic.BaseModel): - """Preset sandbox configuration. - - Provides default values for BoxSpec fields and optionally locks fields - so that tool-call parameters cannot override them. - """ - - name: str - image: str = DEFAULT_BOX_IMAGE - network: BoxNetworkMode = BoxNetworkMode.OFF - timeout_sec: int = 30 - host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE - max_timeout_sec: int = 120 - # Resource limits - cpus: float = 1.0 - memory_mb: int = 512 - pids_limit: int = 128 - read_only_rootfs: bool = True - locked: frozenset[str] = frozenset() - - model_config = pydantic.ConfigDict(frozen=True) - - -BUILTIN_PROFILES: dict[str, BoxProfile] = { - 'default': BoxProfile( - name='default', - network=BoxNetworkMode.OFF, - host_path_mode=BoxHostMountMode.READ_WRITE, - cpus=1.0, - memory_mb=512, - pids_limit=128, - read_only_rootfs=True, - max_timeout_sec=120, - ), - 'offline_readonly': BoxProfile( - name='offline_readonly', - network=BoxNetworkMode.OFF, - host_path_mode=BoxHostMountMode.READ_ONLY, - cpus=0.5, - memory_mb=256, - pids_limit=64, - read_only_rootfs=True, - max_timeout_sec=60, - locked=frozenset({'network', 'host_path_mode', 'read_only_rootfs'}), - ), - 'network_basic': BoxProfile( - name='network_basic', - network=BoxNetworkMode.ON, - host_path_mode=BoxHostMountMode.READ_WRITE, - cpus=1.0, - memory_mb=512, - pids_limit=128, - read_only_rootfs=True, - max_timeout_sec=120, - ), - 'network_extended': BoxProfile( - name='network_extended', - network=BoxNetworkMode.ON, - host_path_mode=BoxHostMountMode.READ_WRITE, - cpus=2.0, - memory_mb=1024, - pids_limit=256, - read_only_rootfs=False, - max_timeout_sec=300, - ), -} - - -class BoxSessionInfo(pydantic.BaseModel): - session_id: str - backend_name: str - backend_session_id: str - image: str - network: BoxNetworkMode - host_path: str | None = None - host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE - cpus: float = 1.0 - memory_mb: int = 512 - pids_limit: int = 128 - read_only_rootfs: bool = True - created_at: dt.datetime - last_used_at: dt.datetime - - -class BoxManagedProcessSpec(pydantic.BaseModel): - command: str - args: list[str] = pydantic.Field(default_factory=list) - env: dict[str, str] = pydantic.Field(default_factory=dict) - cwd: str = '/workspace' - - @pydantic.field_validator('command') - @classmethod - def validate_command(cls, value: str) -> str: - value = value.strip() - if not value: - raise ValueError('command must not be empty') - return value - - @pydantic.field_validator('args') - @classmethod - def validate_args(cls, value: list[str]) -> list[str]: - return [str(item) for item in value] - - @pydantic.field_validator('env') - @classmethod - def validate_env(cls, value: dict[str, str]) -> dict[str, str]: - return {str(k): str(v) for k, v in value.items()} - - @pydantic.field_validator('cwd') - @classmethod - def validate_cwd(cls, value: str) -> str: - value = value.strip() - if not value.startswith('/'): - raise ValueError('cwd must be an absolute path inside the sandbox') - return value - - -class BoxManagedProcessInfo(pydantic.BaseModel): - session_id: str - status: BoxManagedProcessStatus - command: str - args: list[str] - cwd: str - env_keys: list[str] - attached: bool = False - started_at: dt.datetime - exited_at: dt.datetime | None = None - exit_code: int | None = None - stderr_preview: str = '' - - -class BoxExecutionResult(pydantic.BaseModel): - session_id: str - backend_name: str - status: BoxExecutionStatus - exit_code: int | None - stdout: str = '' - stderr: str = '' - duration_ms: int - - @property - def ok(self) -> bool: - return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0 diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py deleted file mode 100644 index 36f8c134..00000000 --- a/src/langbot/pkg/box/runtime.py +++ /dev/null @@ -1,386 +0,0 @@ -from __future__ import annotations - -import asyncio -import collections -import dataclasses -import datetime as dt -import logging -import uuid - -from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend -from .errors import ( - BoxBackendUnavailableError, - BoxManagedProcessConflictError, - BoxManagedProcessNotFoundError, - BoxSessionConflictError, - BoxSessionNotFoundError, - BoxValidationError, -) -from .models import ( - BoxExecutionResult, - BoxExecutionStatus, - BoxManagedProcessInfo, - BoxManagedProcessSpec, - BoxManagedProcessStatus, - BoxSessionInfo, - BoxSpec, -) - -_UTC = dt.timezone.utc -_MANAGED_PROCESS_STDERR_PREVIEW_LIMIT = 4000 - - -@dataclasses.dataclass(slots=True) -class _ManagedProcess: - spec: BoxManagedProcessSpec - process: asyncio.subprocess.Process - started_at: dt.datetime - attach_lock: asyncio.Lock - stderr_chunks: collections.deque[str] - stderr_total_len: int = 0 - exit_code: int | None = None - exited_at: dt.datetime | None = None - - @property - def is_running(self) -> bool: - return self.exit_code is None and self.process.returncode is None - - -@dataclasses.dataclass(slots=True) -class _RuntimeSession: - info: BoxSessionInfo - lock: asyncio.Lock - managed_process: _ManagedProcess | None = None - - -class BoxRuntime: - def __init__( - self, - logger: logging.Logger, - backends: list[BaseSandboxBackend] | None = None, - session_ttl_sec: int = 300, - ): - self.logger = logger - self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)] - self.session_ttl_sec = session_ttl_sec - self._backend: BaseSandboxBackend | None = None - self._sessions: dict[str, _RuntimeSession] = {} - self._lock = asyncio.Lock() - self.instance_id = uuid.uuid4().hex[:12] - - async def initialize(self): - self._backend = await self._select_backend() - if self._backend is not None: - self._backend.instance_id = self.instance_id - try: - await self._backend.cleanup_orphaned_containers(self.instance_id) - except Exception as exc: - self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}') - - async def execute(self, spec: BoxSpec) -> BoxExecutionResult: - if not spec.cmd: - raise BoxValidationError('cmd must not be empty') - session = await self._get_or_create_session(spec) - - async with session.lock: - self.logger.info( - 'LangBot Box execute: ' - f'session_id={spec.session_id} ' - f'backend_session_id={session.info.backend_session_id} ' - f'backend={session.info.backend_name} ' - f'workdir={spec.workdir} ' - f'timeout_sec={spec.timeout_sec}' - ) - result = await (await self._get_backend()).exec(session.info, spec) - - async with self._lock: - now = dt.datetime.now(_UTC) - if spec.session_id in self._sessions: - self._sessions[spec.session_id].info.last_used_at = now - - if result.status == BoxExecutionStatus.TIMED_OUT: - await self._drop_session_locked(spec.session_id) - - return result - - async def shutdown(self): - async with self._lock: - session_ids = list(self._sessions.keys()) - for session_id in session_ids: - await self._drop_session_locked(session_id) - - async def create_session(self, spec: BoxSpec) -> dict: - session = await self._get_or_create_session(spec) - return self._session_to_dict(session.info) - - async def delete_session(self, session_id: str) -> None: - async with self._lock: - if session_id not in self._sessions: - raise BoxSessionNotFoundError(f'session {session_id} not found') - await self._drop_session_locked(session_id) - - async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> dict: - async with self._lock: - runtime_session = self._sessions.get(session_id) - if runtime_session is None: - raise BoxSessionNotFoundError(f'session {session_id} not found') - - async with runtime_session.lock: - existing = runtime_session.managed_process - if existing is not None and existing.is_running: - raise BoxManagedProcessConflictError(f'session {session_id} already has a managed process') - - backend = await self._get_backend() - process = await backend.start_managed_process(runtime_session.info, spec) - managed_process = _ManagedProcess( - spec=spec, - process=process, - started_at=dt.datetime.now(_UTC), - attach_lock=asyncio.Lock(), - stderr_chunks=collections.deque(), - ) - runtime_session.managed_process = managed_process - runtime_session.info.last_used_at = dt.datetime.now(_UTC) - asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, managed_process)) - asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, managed_process)) - return self._managed_process_to_dict(runtime_session.info.session_id, managed_process) - - def get_managed_process(self, session_id: str) -> dict: - runtime_session = self._sessions.get(session_id) - if runtime_session is None: - raise BoxSessionNotFoundError(f'session {session_id} not found') - if runtime_session.managed_process is None: - raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process') - return self._managed_process_to_dict(session_id, runtime_session.managed_process) - - # ── Observability ───────────────────────────────────────────────── - - async def get_backend_info(self) -> dict: - backend = self._backend - if backend is None: - return {'name': None, 'available': False} - try: - available = await backend.is_available() - except Exception: - available = False - return {'name': backend.name, 'available': available} - - def get_sessions(self) -> list[dict]: - return [self._session_to_dict(s.info) for s in self._sessions.values()] - - def get_session(self, session_id: str) -> dict: - runtime_session = self._sessions.get(session_id) - if runtime_session is None: - raise BoxSessionNotFoundError(f'session {session_id} not found') - result = self._session_to_dict(runtime_session.info) - if runtime_session.managed_process is not None: - result['managed_process'] = self._managed_process_to_dict(session_id, runtime_session.managed_process) - return result - - async def get_status(self) -> dict: - backend_info = await self.get_backend_info() - return { - 'backend': backend_info, - 'active_sessions': len(self._sessions), - 'managed_processes': sum( - 1 - for runtime_session in self._sessions.values() - if runtime_session.managed_process is not None and runtime_session.managed_process.is_running - ), - 'session_ttl_sec': self.session_ttl_sec, - } - - async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: - async with self._lock: - await self._reap_expired_sessions_locked() - - existing = self._sessions.get(spec.session_id) - if existing is not None: - self._assert_session_compatible(existing.info, spec) - existing.info.last_used_at = dt.datetime.now(_UTC) - self.logger.info( - 'LangBot Box session reused: ' - f'session_id={spec.session_id} ' - f'backend_session_id={existing.info.backend_session_id} ' - f'backend={existing.info.backend_name}' - ) - return existing - - backend = await self._get_backend() - info = await backend.start_session(spec) - runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock()) - self._sessions[spec.session_id] = runtime_session - self.logger.info( - 'LangBot Box session created: ' - f'session_id={spec.session_id} ' - f'backend_session_id={info.backend_session_id} ' - f'backend={info.backend_name} ' - f'image={info.image} ' - f'network={info.network.value} ' - f'host_path={info.host_path} ' - f'host_path_mode={info.host_path_mode.value}' - ) - return runtime_session - - async def _get_backend(self) -> BaseSandboxBackend: - if self._backend is None: - self._backend = await self._select_backend() - if self._backend is None: - raise BoxBackendUnavailableError( - 'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.' - ) - return self._backend - - async def _select_backend(self) -> BaseSandboxBackend | None: - for backend in self.backends: - try: - await backend.initialize() - if await backend.is_available(): - self.logger.info(f'LangBot Box using backend: {backend.name}') - return backend - except Exception as exc: - self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') - - self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready') - return None - - async def _reap_expired_sessions_locked(self): - if self.session_ttl_sec <= 0: - return - - deadline = dt.datetime.now(_UTC) - dt.timedelta(seconds=self.session_ttl_sec) - expired_session_ids = [ - session_id - for session_id, session in self._sessions.items() - if session.info.last_used_at < deadline - and not (session.managed_process is not None and session.managed_process.is_running) - ] - - for session_id in expired_session_ids: - await self._drop_session_locked(session_id) - - async def _drop_session_locked(self, session_id: str): - runtime_session = self._sessions.pop(session_id, None) - if runtime_session is None or self._backend is None: - return - - await self._terminate_managed_process(runtime_session) - - try: - self.logger.info( - 'LangBot Box session cleanup: ' - f'session_id={session_id} ' - f'backend_session_id={runtime_session.info.backend_session_id} ' - f'backend={runtime_session.info.backend_name}' - ) - await self._backend.stop_session(runtime_session.info) - except Exception as exc: - self.logger.warning(f'Failed to clean up box session {session_id}: {exc}') - - def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): - _COMPAT_FIELDS = ( - 'network', - 'image', - 'host_path', - 'host_path_mode', - 'cpus', - 'memory_mb', - 'pids_limit', - 'read_only_rootfs', - ) - for field in _COMPAT_FIELDS: - session_val = getattr(session, field) - spec_val = getattr(spec, field) - if session_val != spec_val: - display = session_val.value if hasattr(session_val, 'value') else session_val - raise BoxSessionConflictError( - f'sandbox_exec session {spec.session_id} already exists with {field}={display}' - ) - - async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None: - stream = managed_process.process.stderr - if stream is None: - return - - try: - while True: - chunk = await stream.readline() - if not chunk: - break - text = chunk.decode('utf-8', errors='replace').rstrip() - if not text: - continue - managed_process.stderr_chunks.append(text) - managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator - while ( - managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT - and managed_process.stderr_chunks - ): - removed = managed_process.stderr_chunks.popleft() - managed_process.stderr_total_len -= len(removed) + 1 - self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}') - except Exception as exc: - self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}') - - async def _watch_managed_process(self, session_id: str, managed_process: _ManagedProcess) -> None: - return_code = await managed_process.process.wait() - managed_process.exit_code = return_code - managed_process.exited_at = dt.datetime.now(_UTC) - runtime_session = self._sessions.get(session_id) - if runtime_session is not None: - runtime_session.info.last_used_at = managed_process.exited_at - self.logger.info(f'LangBot Box managed process exited: session_id={session_id} return_code={return_code}') - - async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> None: - managed_process = runtime_session.managed_process - if managed_process is None or not managed_process.is_running: - return - - process = managed_process.process - try: - if process.stdin is not None: - process.stdin.close() - except Exception: - pass - - try: - await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) - except asyncio.TimeoutError: - if process.returncode is None: - try: - process.terminate() - except ProcessLookupError: - pass - try: - await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) - except asyncio.TimeoutError: - if process.returncode is None: - try: - process.kill() - except ProcessLookupError: - pass - await process.wait() - finally: - managed_process.exit_code = process.returncode - managed_process.exited_at = dt.datetime.now(_UTC) - - def _managed_process_to_dict(self, session_id: str, managed_process: _ManagedProcess) -> dict: - stderr_preview = '\n'.join(managed_process.stderr_chunks) - status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED - return BoxManagedProcessInfo( - session_id=session_id, - status=status, - command=managed_process.spec.command, - args=managed_process.spec.args, - cwd=managed_process.spec.cwd, - env_keys=sorted(managed_process.spec.env.keys()), - attached=managed_process.attach_lock.locked(), - started_at=managed_process.started_at, - exited_at=managed_process.exited_at, - exit_code=managed_process.exit_code, - stderr_preview=stderr_preview, - ).model_dump(mode='json') - - @staticmethod - def _session_to_dict(info: BoxSessionInfo) -> dict: - return info.model_dump(mode='json') diff --git a/src/langbot/pkg/box/security.py b/src/langbot/pkg/box/security.py deleted file mode 100644 index d5a8c513..00000000 --- a/src/langbot/pkg/box/security.py +++ /dev/null @@ -1,35 +0,0 @@ -from __future__ import annotations - -import os - -from .errors import BoxValidationError -from .models import BoxSpec - -BLOCKED_HOST_PATHS = frozenset( - { - '/etc', - '/proc', - '/sys', - '/dev', - '/root', - '/boot', - '/run', - '/var/run', - '/run/docker.sock', - '/var/run/docker.sock', - '/run/podman', - '/var/run/podman', - } -) - - -def validate_sandbox_security(spec: BoxSpec) -> None: - """Validate that a BoxSpec does not request dangerous container config. - - Raises BoxValidationError when the spec contains a blocked host_path. - """ - if spec.host_path: - real = os.path.realpath(spec.host_path) - for blocked in BLOCKED_HOST_PATHS: - if real == blocked or real.startswith(blocked + '/'): - raise BoxValidationError(f'host_path {spec.host_path} is blocked for security') diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py deleted file mode 100644 index 8640b5e9..00000000 --- a/src/langbot/pkg/box/server.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Standalone Box Runtime service exposing BoxRuntime via action RPC. - -Usage (stdio, launched by LangBot as subprocess): - python -m langbot.pkg.box.server - -Usage (ws + ws relay, for remote/docker mode): - python -m langbot.pkg.box.server --port 5410 -""" - -from __future__ import annotations - -import argparse -import asyncio -import datetime as dt -import logging -import sys -from typing import Any - -import pydantic -from aiohttp import web - -from langbot_plugin.entities.io.actions.enums import CommonAction -from langbot_plugin.entities.io.resp import ActionResponse -from langbot_plugin.runtime.io.connection import Connection -from langbot_plugin.runtime.io.handler import Handler - -from .actions import LangBotToBoxAction -from .errors import ( - BoxManagedProcessConflictError, - BoxManagedProcessNotFoundError, - BoxSessionNotFoundError, -) -from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec -from .runtime import BoxRuntime - -logger = logging.getLogger('langbot.box.server') - - -def _result_to_dict(result: BoxExecutionResult) -> dict: - return result.model_dump(mode='json') - - -class BoxServerHandler(Handler): - """Server-side handler that registers box actions backed by BoxRuntime.""" - - name = 'BoxServerHandler' - - def __init__(self, connection: Connection, runtime: BoxRuntime): - super().__init__(connection) - self._runtime = runtime - self._register_actions() - - def _register_actions(self) -> None: - @self.action(CommonAction.PING) - async def ping(data: dict[str, Any]) -> ActionResponse: - return ActionResponse.success({}) - - @self.action(LangBotToBoxAction.HEALTH) - async def health(data: dict[str, Any]) -> ActionResponse: - info = await self._runtime.get_backend_info() - return ActionResponse.success(info) - - @self.action(LangBotToBoxAction.STATUS) - async def status(data: dict[str, Any]) -> ActionResponse: - result = await self._runtime.get_status() - return ActionResponse.success(result) - - @self.action(LangBotToBoxAction.EXEC) - async def exec_cmd(data: dict[str, Any]) -> ActionResponse: - try: - spec = BoxSpec.model_validate(data) - except pydantic.ValidationError as exc: - return ActionResponse.error(f'BoxValidationError: {exc}') - result = await self._runtime.execute(spec) - return ActionResponse.success(_result_to_dict(result)) - - @self.action(LangBotToBoxAction.CREATE_SESSION) - async def create_session(data: dict[str, Any]) -> ActionResponse: - try: - spec = BoxSpec.model_validate(data) - except pydantic.ValidationError as exc: - return ActionResponse.error(f'BoxValidationError: {exc}') - info = await self._runtime.create_session(spec) - return ActionResponse.success(info) - - @self.action(LangBotToBoxAction.GET_SESSION) - async def get_session(data: dict[str, Any]) -> ActionResponse: - return ActionResponse.success(self._runtime.get_session(data['session_id'])) - - @self.action(LangBotToBoxAction.GET_SESSIONS) - async def get_sessions(data: dict[str, Any]) -> ActionResponse: - return ActionResponse.success({'sessions': self._runtime.get_sessions()}) - - @self.action(LangBotToBoxAction.DELETE_SESSION) - async def delete_session(data: dict[str, Any]) -> ActionResponse: - await self._runtime.delete_session(data['session_id']) - return ActionResponse.success({'deleted': data['session_id']}) - - @self.action(LangBotToBoxAction.START_MANAGED_PROCESS) - async def start_managed_process(data: dict[str, Any]) -> ActionResponse: - session_id = data['session_id'] - try: - spec = BoxManagedProcessSpec.model_validate(data['spec']) - except pydantic.ValidationError as exc: - return ActionResponse.error(f'BoxValidationError: {exc}') - info = await self._runtime.start_managed_process(session_id, spec) - return ActionResponse.success(info) - - @self.action(LangBotToBoxAction.GET_MANAGED_PROCESS) - async def get_managed_process(data: dict[str, Any]) -> ActionResponse: - return ActionResponse.success(self._runtime.get_managed_process(data['session_id'])) - - @self.action(LangBotToBoxAction.GET_BACKEND_INFO) - async def get_backend_info(data: dict[str, Any]) -> ActionResponse: - info = await self._runtime.get_backend_info() - return ActionResponse.success(info) - - @self.action(LangBotToBoxAction.SHUTDOWN) - async def shutdown(data: dict[str, Any]) -> ActionResponse: - await self._runtime.shutdown() - return ActionResponse.success({}) - - -# ── Managed process WebSocket relay (aiohttp) ──────────────────────── - - -def _error_response(exc: Exception) -> web.Response: - return web.json_response( - {'error': {'code': type(exc).__name__, 'message': str(exc)}}, - status=400, - ) - - -async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: - runtime: BoxRuntime = request.app['runtime'] - session_id = request.match_info['session_id'] - - runtime_session = runtime._sessions.get(session_id) - if runtime_session is None: - return _error_response(BoxSessionNotFoundError(f'session {session_id} not found')) - - managed_process = runtime_session.managed_process - if managed_process is None: - return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process')) - if not managed_process.is_running: - return _error_response( - BoxManagedProcessConflictError(f'managed process in session {session_id} is not running') - ) - - ws = web.WebSocketResponse(protocols=('mcp',)) - await ws.prepare(request) - - async with managed_process.attach_lock: - process = managed_process.process - stdout = process.stdout - stdin = process.stdin - if stdout is None or stdin is None: - await ws.close(message=b'managed process stdio unavailable') - return ws - - async def _stdout_to_ws() -> None: - while True: - line = await stdout.readline() - if not line: - break - await ws.send_str(line.decode('utf-8', errors='replace').rstrip('\n')) - runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) - - async def _ws_to_stdin() -> None: - async for msg in ws: - if msg.type == web.WSMsgType.TEXT: - stdin.write((msg.data + '\n').encode('utf-8')) - await stdin.drain() - runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) - elif msg.type in ( - web.WSMsgType.CLOSE, - web.WSMsgType.CLOSING, - web.WSMsgType.CLOSED, - web.WSMsgType.ERROR, - ): - break - - stdout_task = asyncio.create_task(_stdout_to_ws()) - stdin_task = asyncio.create_task(_ws_to_stdin()) - try: - done, pending = await asyncio.wait( - [stdout_task, stdin_task], - return_when=asyncio.FIRST_COMPLETED, - ) - for task in pending: - task.cancel() - for task in done: - task.result() - finally: - await ws.close() - - return ws - - -def create_ws_relay_app(runtime: BoxRuntime) -> web.Application: - """Create a minimal aiohttp app that only serves the managed-process ws relay.""" - app = web.Application() - app['runtime'] = runtime - app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) - return app - - -# ── Entry point ────────────────────────────────────────────────────── - - -async def _run_server(host: str, port: int, mode: str) -> None: - runtime = BoxRuntime(logger=logger) - await runtime.initialize() - - # Start aiohttp for ws relay (non-fatal — managed process attach - # degrades gracefully if the port is unavailable). - runner: web.AppRunner | None = None - try: - ws_app = create_ws_relay_app(runtime) - runner = web.AppRunner(ws_app) - await runner.setup() - site = web.TCPSite(runner, host, port) - await site.start() - logger.info(f'Box ws relay listening on {host}:{port}') - except OSError as exc: - logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}') - logger.warning('Managed process WebSocket attach will be unavailable.') - - async def new_connection_callback(connection: Connection) -> None: - handler = BoxServerHandler(connection, runtime) - await handler.run() - - try: - if mode == 'stdio': - from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController - - ctrl = StdioServerController() - await ctrl.run(new_connection_callback) - else: - from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController - - # Action RPC uses port+1 to avoid conflict with ws relay - rpc_port = port + 1 - logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}') - ctrl = WebSocketServerController(rpc_port) - await ctrl.run(new_connection_callback) - finally: - await runtime.shutdown() - if runner is not None: - await runner.cleanup() - - -def main() -> None: - parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') - parser.add_argument('--host', default='0.0.0.0', help='Bind address') - parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)') - parser.add_argument( - '--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)' - ) - args = parser.parse_args() - - logging.basicConfig(level=logging.INFO, stream=sys.stderr) - asyncio.run(_run_server(args.host, args.port, args.mode)) - - -if __name__ == '__main__': - main() diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 294c2982..9b3e85f2 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -9,17 +9,16 @@ from typing import TYPE_CHECKING import pydantic -from .client import BoxRuntimeClient -from .connector import BoxRuntimeConnector -from .errors import BoxError, BoxValidationError -from .models import ( +from langbot_plugin.box.client import BoxRuntimeClient +from .connector import BoxRuntimeConnector, _get_box_config +from langbot_plugin.box.errors import BoxError, BoxValidationError +from langbot_plugin.box.models import ( BUILTIN_PROFILES, BoxExecutionResult, BoxManagedProcessInfo, BoxManagedProcessSpec, BoxProfile, BoxSpec, - get_box_config, ) _INT_ADAPTER = pydantic.TypeAdapter(int) @@ -241,7 +240,7 @@ class BoxService: } def _load_allowed_host_mount_roots(self) -> list[str]: - configured_roots = get_box_config(self.ap).get('allowed_host_mount_roots', []) + configured_roots = _get_box_config(self.ap).get('allowed_host_mount_roots', []) normalized_roots: list[str] = [] for root in configured_roots: @@ -253,7 +252,7 @@ class BoxService: return normalized_roots def _load_default_host_workspace(self) -> str | None: - default_host_workspace = str(get_box_config(self.ap).get('default_host_workspace', '')).strip() + default_host_workspace = str(_get_box_config(self.ap).get('default_host_workspace', '')).strip() if not default_host_workspace: return None return os.path.realpath(os.path.abspath(default_host_workspace)) @@ -302,7 +301,7 @@ class BoxService: raise BoxValidationError(f'host_path is outside allowed_host_mount_roots: {allowed_roots}') def _load_profile(self) -> BoxProfile: - profile_name = str(get_box_config(self.ap).get('profile', 'default')).strip() or 'default' + profile_name = str(_get_box_config(self.ap).get('profile', 'default')).strip() or 'default' profile = BUILTIN_PROFILES.get(profile_name) if profile is None: diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 76ff5017..f2e16d92 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -327,7 +327,7 @@ class RuntimeMCPSession: async def _monitor_box_process_health(self): """Poll managed process status; return when process exits.""" - from langbot.pkg.box.models import BoxManagedProcessStatus + from langbot_plugin.box.models import BoxManagedProcessStatus session_id = self._build_box_session_id() consecutive_errors = 0 diff --git a/src/langbot/pkg/provider/tools/loaders/native.py b/src/langbot/pkg/provider/tools/loaders/native.py index fdf74f40..4e13a780 100644 --- a/src/langbot/pkg/provider/tools/loaders/native.py +++ b/src/langbot/pkg/provider/tools/loaders/native.py @@ -5,7 +5,7 @@ import json import langbot_plugin.api.entities.builtin.resource.tool as resource_tool from langbot_plugin.api.entities.events import pipeline_query -from langbot.pkg.box.models import BoxNetworkMode +from langbot_plugin.box.models import BoxNetworkMode from .. import loader SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec' diff --git a/tests/unit_tests/box/test_backend_clip.py b/tests/unit_tests/box/test_backend_clip.py deleted file mode 100644 index f6ea07b2..00000000 --- a/tests/unit_tests/box/test_backend_clip.py +++ /dev/null @@ -1,38 +0,0 @@ -from __future__ import annotations - -import pytest - -from langbot.pkg.box.backend import CLISandboxBackend, _MAX_RAW_OUTPUT_BYTES - - -class TestClipCapturedBytes: - def test_within_limit_unchanged(self): - data = b'hello world' - result = CLISandboxBackend._clip_captured_bytes(data, total_size=len(data), limit=1024) - assert result == 'hello world' - - def test_exceeding_limit_clips_and_appends_notice(self): - captured = b'A' * 100 - total_size = 200 - result = CLISandboxBackend._clip_captured_bytes(captured, total_size=total_size, limit=100) - assert result.startswith('A' * 100) - assert 'raw output clipped at 100 bytes' in result - assert '100 bytes discarded' in result - - def test_exact_limit_not_clipped(self): - data = b'B' * 100 - result = CLISandboxBackend._clip_captured_bytes(data, total_size=100, limit=100) - assert result == 'B' * 100 - assert 'clipped' not in result - - def test_default_limit_is_module_constant(self): - data = b'x' * 10 - result = CLISandboxBackend._clip_captured_bytes(data, total_size=10) - assert result == 'x' * 10 - assert _MAX_RAW_OUTPUT_BYTES == 1_048_576 - - def test_invalid_utf8_replaced(self): - data = b'ok\xff\xfetail' - result = CLISandboxBackend._clip_captured_bytes(data, total_size=len(data), limit=1024) - assert 'ok' in result - assert 'tail' in result diff --git a/tests/unit_tests/box/test_box_connector.py b/tests/unit_tests/box/test_box_connector.py index 0740c53b..88d18d74 100644 --- a/tests/unit_tests/box/test_box_connector.py +++ b/tests/unit_tests/box/test_box_connector.py @@ -5,9 +5,9 @@ from unittest.mock import AsyncMock, Mock, patch import pytest -from langbot.pkg.box.client import ActionRPCBoxClient +from langbot_plugin.box.client import ActionRPCBoxClient from langbot.pkg.box.connector import BoxRuntimeConnector -from langbot.pkg.box.errors import BoxRuntimeUnavailableError +from langbot_plugin.box.errors import BoxRuntimeUnavailableError def make_app(logger: Mock, runtime_url: str = ''): @@ -27,7 +27,6 @@ def make_app(logger: Mock, runtime_url: str = ''): def patch_platform(monkeypatch: pytest.MonkeyPatch, value: str): - monkeypatch.setattr('langbot.pkg.box.client.platform.get_platform', lambda: value) monkeypatch.setattr('langbot.pkg.box.connector.platform.get_platform', lambda: value) diff --git a/tests/unit_tests/box/test_box_managed_process.py b/tests/unit_tests/box/test_box_managed_process.py deleted file mode 100644 index d3e7f6cb..00000000 --- a/tests/unit_tests/box/test_box_managed_process.py +++ /dev/null @@ -1,103 +0,0 @@ -from __future__ import annotations - -import asyncio -import datetime as dt -from unittest.mock import Mock - -import pytest - -from langbot.pkg.box.backend import BaseSandboxBackend -from langbot.pkg.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSessionInfo, BoxSpec -from langbot.pkg.box.runtime import BoxRuntime - -_UTC = dt.timezone.utc - - -class FakeManagedProcessBackend(BaseSandboxBackend): - name = 'fake-managed' - - def __init__(self, logger: Mock): - super().__init__(logger) - - async def is_available(self) -> bool: - return True - - async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: - now = dt.datetime.now(_UTC) - return BoxSessionInfo( - session_id=spec.session_id, - backend_name=self.name, - backend_session_id=f'backend-{spec.session_id}', - image=spec.image, - network=spec.network, - host_path=spec.host_path, - host_path_mode=spec.host_path_mode, - cpus=spec.cpus, - memory_mb=spec.memory_mb, - pids_limit=spec.pids_limit, - read_only_rootfs=spec.read_only_rootfs, - created_at=now, - last_used_at=now, - ) - - async def exec(self, session: BoxSessionInfo, spec: BoxSpec): - raise NotImplementedError - - async def stop_session(self, session: BoxSessionInfo): - return None - - async def start_managed_process(self, session: BoxSessionInfo, spec: BoxManagedProcessSpec) -> asyncio.subprocess.Process: - return await asyncio.create_subprocess_exec( - 'sh', - '-lc', - 'cat', - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - -@pytest.mark.asyncio -async def test_runtime_start_managed_process_tracks_status(): - logger = Mock() - runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=300) - await runtime.initialize() - - session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'}) - await runtime.create_session(session_spec) - - process_info = await runtime.start_managed_process( - 'mcp-session', - BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'), - ) - - assert process_info['session_id'] == 'mcp-session' - assert process_info['status'] == BoxManagedProcessStatus.RUNNING.value - assert process_info['command'] == 'python' - assert process_info['args'] == ['-m', 'demo'] - - queried = runtime.get_managed_process('mcp-session') - assert queried['status'] == BoxManagedProcessStatus.RUNNING.value - - await runtime.shutdown() - - -@pytest.mark.asyncio -async def test_runtime_does_not_reap_session_with_running_managed_process(): - logger = Mock() - runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=1) - await runtime.initialize() - - session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'}) - await runtime.create_session(session_spec) - await runtime.start_managed_process( - 'mcp-session', - BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'), - ) - - runtime._sessions['mcp-session'].info.last_used_at = dt.datetime.now(_UTC) - dt.timedelta(seconds=120) - await runtime._reap_expired_sessions_locked() - - assert 'mcp-session' in runtime._sessions - - await runtime.shutdown() diff --git a/tests/unit_tests/box/test_box_security.py b/tests/unit_tests/box/test_box_security.py deleted file mode 100644 index bc7cc48e..00000000 --- a/tests/unit_tests/box/test_box_security.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -import pytest - -from langbot.pkg.box.errors import BoxValidationError -from langbot.pkg.box.models import BoxHostMountMode, BoxNetworkMode, BoxSpec -from langbot.pkg.box.security import BLOCKED_HOST_PATHS, validate_sandbox_security - - -def _make_spec(**overrides) -> BoxSpec: - defaults = { - 'session_id': 'test-session', - 'cmd': 'echo hi', - 'image': 'python:3.11-slim', - } - defaults.update(overrides) - return BoxSpec(**defaults) - - -class TestValidateSandboxSecurity: - def test_no_host_path_passes(self): - spec = _make_spec(host_path=None) - validate_sandbox_security(spec) # should not raise - - def test_safe_host_path_passes(self): - spec = _make_spec(host_path='/home/user/my-project') - validate_sandbox_security(spec) # should not raise - - @pytest.mark.parametrize('blocked', [ - '/etc', - '/proc', - '/sys', - '/dev', - '/root', - '/boot', - '/run', - '/var/run', - '/run/docker.sock', - '/var/run/docker.sock', - '/run/podman', - '/var/run/podman', - ]) - def test_blocked_paths_rejected(self, blocked): - spec = _make_spec(host_path=blocked) - with pytest.raises(BoxValidationError, match='blocked for security'): - validate_sandbox_security(spec) - - def test_blocked_subpath_rejected(self): - spec = _make_spec(host_path='/etc/nginx') - with pytest.raises(BoxValidationError, match='blocked for security'): - validate_sandbox_security(spec) - - def test_path_starting_with_blocked_prefix_but_different_dir_passes(self): - # /etcetera is NOT /etc - spec = _make_spec(host_path='/etcetera/data') - validate_sandbox_security(spec) # should not raise - - def test_blocked_host_paths_is_frozenset(self): - assert isinstance(BLOCKED_HOST_PATHS, frozenset) diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index 62951b84..ddf9744c 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -10,10 +10,10 @@ import pytest import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -from langbot.pkg.box.backend import BaseSandboxBackend -from langbot.pkg.box.client import BoxRuntimeClient, ActionRPCBoxClient -from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError -from langbot.pkg.box.models import ( +from langbot_plugin.box.backend import BaseSandboxBackend +from langbot_plugin.box.client import BoxRuntimeClient, ActionRPCBoxClient +from langbot_plugin.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError +from langbot_plugin.box.models import ( BUILTIN_PROFILES, BoxExecutionResult, BoxExecutionStatus, @@ -24,7 +24,7 @@ from langbot.pkg.box.models import ( BoxSessionInfo, BoxSpec, ) -from langbot.pkg.box.runtime import BoxRuntime +from langbot_plugin.box.runtime import BoxRuntime from langbot.pkg.box.service import BoxService _UTC = dt.timezone.utc @@ -803,7 +803,7 @@ def _make_queue_connection_pair(): async def _make_rpc_pair(runtime: BoxRuntime): """Create an in-process (ActionRPCBoxClient, server_task, client_task) connected via queues.""" - from langbot.pkg.box.server import BoxServerHandler + from langbot_plugin.box.server import BoxServerHandler from langbot_plugin.runtime.io.handler import Handler client_conn, server_conn = _make_queue_connection_pair() diff --git a/tests/unit_tests/provider/test_mcp_box_integration.py b/tests/unit_tests/provider/test_mcp_box_integration.py index 83617474..c49be935 100644 --- a/tests/unit_tests/provider/test_mcp_box_integration.py +++ b/tests/unit_tests/provider/test_mcp_box_integration.py @@ -93,8 +93,8 @@ def mcp_module(): class _BPS(str, _enum.Enum): RUNNING = 'running' EXITED = 'exited' - _save_and_stub('langbot.pkg.box', is_package=True) - _save_and_stub('langbot.pkg.box.models', {'BoxManagedProcessStatus': _BPS}) + _save_and_stub('langbot_plugin.box', is_package=True) + _save_and_stub('langbot_plugin.box.models', {'BoxManagedProcessStatus': _BPS}) # Now load mcp.py via spec_from_file_location mod_fqn = 'langbot.pkg.provider.tools.loaders.mcp'