diff --git a/src/langbot/pkg/box/backend.py b/src/langbot/pkg/box/backend.py index ea74a090..fda0846f 100644 --- a/src/langbot/pkg/box/backend.py +++ b/src/langbot/pkg/box/backend.py @@ -4,6 +4,8 @@ import abc import asyncio import dataclasses import datetime as dt +import hashlib +import json import logging import re import shlex @@ -12,7 +14,8 @@ import typing import uuid from .errors import BoxError -from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec +from .models import DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, BoxSessionInfo, BoxSpec +from .security import validate_sandbox_security # Hard cap on raw subprocess output to prevent unbounded memory usage. # Container timeout already bounds duration, but fast commands can still @@ -54,6 +57,13 @@ class BaseSandboxBackend(abc.ABC): async def stop_session(self, session: BoxSessionInfo): pass + async def start_managed_process(self, session: BoxSessionInfo, spec): + raise BoxError(f'{self.name} backend does not support managed processes') + + async def cleanup_orphaned_containers(self): + """Remove lingering containers from previous runs. No-op by default.""" + pass + class CLISandboxBackend(BaseSandboxBackend): command: str @@ -71,6 +81,8 @@ class CLISandboxBackend(BaseSandboxBackend): return result.return_code == 0 and not result.timed_out async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + validate_sandbox_security(spec) + now = dt.datetime.now(dt.UTC) container_name = self._build_container_name(spec.session_id) @@ -87,6 +99,19 @@ class CLISandboxBackend(BaseSandboxBackend): f'langbot.session_id={spec.session_id}', ] + # Config hash label for identifying configuration drift + config_hash = hashlib.sha256(json.dumps({ + 'image': spec.image, + 'network': spec.network.value, + 'host_path': spec.host_path, + 'host_path_mode': spec.host_path_mode.value, + 'cpus': spec.cpus, + 'memory_mb': spec.memory_mb, + 'pids_limit': spec.pids_limit, + 'read_only_rootfs': spec.read_only_rootfs, + }, sort_keys=True).encode()).hexdigest()[:16] + args.extend(['--label', f'langbot.box.config_hash={config_hash}']) + if spec.network.value == 'off': args.extend(['--network', 'none']) @@ -99,7 +124,7 @@ class CLISandboxBackend(BaseSandboxBackend): args.append('--read-only') args.extend(['--tmpfs', '/tmp:size=64m']) - if spec.host_path is not None: + if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: mount_spec = f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}:{spec.host_path_mode.value}' args.extend(['-v', mount_spec]) @@ -193,6 +218,54 @@ class CLISandboxBackend(BaseSandboxBackend): check=False, ) + async def cleanup_orphaned_containers(self): + """Remove any lingering langbot.box containers from previous runs.""" + result = await self._run_command( + [self.command, 'ps', '-a', '--filter', 'label=langbot.box=true', '-q'], + timeout_sec=10, + check=False, + ) + if result.return_code != 0 or not result.stdout.strip(): + return + container_ids = [cid.strip() for cid in result.stdout.strip().split('\n') if cid.strip()] + if not container_ids: + return + for cid in container_ids: + self.logger.info(f'Cleaning up orphaned Box container: {cid}') + await self._run_command( + [self.command, 'rm', '-f', *container_ids], + timeout_sec=30, + check=False, + ) + + async def start_managed_process(self, session: BoxSessionInfo, spec) -> asyncio.subprocess.Process: + args = [self.command, 'exec', '-i'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_spawn_command(spec.cwd, spec.command, spec.args), + ] + ) + + self.logger.info( + f'LangBot Box backend start_managed_process: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id} ' + f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} command={spec.command} args={spec.args}' + ) + + return await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + def _build_container_name(self, session_id: str) -> str: normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session' suffix = uuid.uuid4().hex[:8] @@ -202,6 +275,11 @@ class CLISandboxBackend(BaseSandboxBackend): quoted_workdir = shlex.quote(workdir) return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}' + def _build_spawn_command(self, cwd: str, command: str, args: list[str]) -> str: + quoted_cwd = shlex.quote(cwd) + command_parts = [shlex.quote(command), *[shlex.quote(arg) for arg in args]] + return f'mkdir -p {quoted_cwd} && cd {quoted_cwd} && exec {" ".join(command_parts)}' + async def _run_command( self, args: list[str], diff --git a/src/langbot/pkg/box/client.py b/src/langbot/pkg/box/client.py index 3e9808ca..cb83bf84 100644 --- a/src/langbot/pkg/box/client.py +++ b/src/langbot/pkg/box/client.py @@ -11,12 +11,21 @@ import aiohttp from .errors import ( BoxBackendUnavailableError, BoxError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, BoxRuntimeUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError, ) -from .models import BoxExecutionResult, BoxExecutionStatus, BoxSpec, get_box_config +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxSpec, + get_box_config, +) from ..utils import platform if TYPE_CHECKING: @@ -26,6 +35,8 @@ _ERROR_CODE_MAP: dict[str, type[BoxError]] = { 'validation_error': BoxValidationError, 'session_not_found': BoxSessionNotFoundError, 'session_conflict': BoxSessionConflictError, + 'managed_process_not_found': BoxManagedProcessNotFoundError, + 'managed_process_conflict': BoxManagedProcessConflictError, 'backend_unavailable': BoxBackendUnavailableError, 'runtime_unavailable': BoxRuntimeUnavailableError, 'internal_error': BoxError, @@ -69,6 +80,12 @@ class BoxRuntimeClient(abc.ABC): @abc.abstractmethod async def create_session(self, spec: BoxSpec) -> dict: ... + @abc.abstractmethod + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ... + + @abc.abstractmethod + async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ... + class RemoteBoxRuntimeClient(BoxRuntimeClient): """HTTP client that talks to a standalone Box Runtime service.""" @@ -182,3 +199,41 @@ class RemoteBoxRuntimeClient(BoxRuntimeClient): return await resp.json() except aiohttp.ClientError as exc: raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: + session = self._get_session() + payload = spec.model_dump(mode='json') + try: + async with session.post( + f'{self._base_url}/v1/sessions/{session_id}/managed-process', + json=payload, + ) as resp: + await self._check_response(resp) + data = await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return BoxManagedProcessInfo.model_validate(data) + + async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: + session = self._get_session() + try: + async with session.get( + f'{self._base_url}/v1/sessions/{session_id}/managed-process', + ) as resp: + await self._check_response(resp) + data = await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return BoxManagedProcessInfo.model_validate(data) + + def get_managed_process_websocket_url(self, session_id: str) -> str: + if self._base_url.startswith('https://'): + scheme = 'wss://' + suffix = self._base_url[len('https://'):] + elif self._base_url.startswith('http://'): + scheme = 'ws://' + suffix = self._base_url[len('http://'):] + else: + scheme = 'ws://' + suffix = self._base_url + return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws' diff --git a/src/langbot/pkg/box/errors.py b/src/langbot/pkg/box/errors.py index 8ef8d2ec..f6a8e864 100644 --- a/src/langbot/pkg/box/errors.py +++ b/src/langbot/pkg/box/errors.py @@ -23,3 +23,11 @@ class BoxSessionConflictError(BoxError): class BoxSessionNotFoundError(BoxError): """Raised when a referenced session does not exist.""" + + +class BoxManagedProcessConflictError(BoxError): + """Raised when a session already has an active managed process.""" + + +class BoxManagedProcessNotFoundError(BoxError): + """Raised when a referenced managed process does not exist.""" diff --git a/src/langbot/pkg/box/models.py b/src/langbot/pkg/box/models.py index 64f71f4a..3d1b2a16 100644 --- a/src/langbot/pkg/box/models.py +++ b/src/langbot/pkg/box/models.py @@ -28,10 +28,16 @@ class BoxExecutionStatus(str, enum.Enum): class BoxHostMountMode(str, enum.Enum): + NONE = 'none' READ_ONLY = 'ro' READ_WRITE = 'rw' +class BoxManagedProcessStatus(str, enum.Enum): + RUNNING = 'running' + EXITED = 'exited' + + class BoxSpec(pydantic.BaseModel): cmd: str = '' workdir: str = '/workspace' @@ -116,6 +122,8 @@ class BoxSpec(pydantic.BaseModel): def validate_host_mount_consistency(self) -> 'BoxSpec': if self.host_path is None: return self + if self.host_path_mode == BoxHostMountMode.NONE: + return self if not self.workdir.startswith(DEFAULT_BOX_MOUNT_PATH): raise ValueError('workdir must stay under /workspace when host_path is provided') return self @@ -205,6 +213,53 @@ class BoxSessionInfo(pydantic.BaseModel): last_used_at: dt.datetime +class BoxManagedProcessSpec(pydantic.BaseModel): + command: str + args: list[str] = pydantic.Field(default_factory=list) + env: dict[str, str] = pydantic.Field(default_factory=dict) + cwd: str = '/workspace' + + @pydantic.field_validator('command') + @classmethod + def validate_command(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('command must not be empty') + return value + + @pydantic.field_validator('args') + @classmethod + def validate_args(cls, value: list[str]) -> list[str]: + return [str(item) for item in value] + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + @pydantic.field_validator('cwd') + @classmethod + def validate_cwd(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('cwd must be an absolute path inside the sandbox') + return value + + +class BoxManagedProcessInfo(pydantic.BaseModel): + session_id: str + status: BoxManagedProcessStatus + command: str + args: list[str] + cwd: str + env_keys: list[str] + attached: bool = False + started_at: dt.datetime + exited_at: dt.datetime | None = None + exit_code: int | None = None + stderr_preview: str = '' + + class BoxExecutionResult(pydantic.BaseModel): session_id: str backend_name: str diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py index 89ad8c0b..93078b71 100644 --- a/src/langbot/pkg/box/runtime.py +++ b/src/langbot/pkg/box/runtime.py @@ -1,21 +1,54 @@ from __future__ import annotations import asyncio +import collections import dataclasses import datetime as dt import logging from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend -from .errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError -from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec +from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, +) +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxManagedProcessStatus, + BoxSessionInfo, + BoxSpec, +) _UTC = dt.timezone.utc +_MANAGED_PROCESS_STDERR_PREVIEW_LIMIT = 4000 + + +@dataclasses.dataclass(slots=True) +class _ManagedProcess: + spec: BoxManagedProcessSpec + process: asyncio.subprocess.Process + started_at: dt.datetime + attach_lock: asyncio.Lock + stderr_chunks: collections.deque[str] + exit_code: int | None = None + exited_at: dt.datetime | None = None + + @property + def is_running(self) -> bool: + return self.exit_code is None and self.process.returncode is None @dataclasses.dataclass(slots=True) class _RuntimeSession: info: BoxSessionInfo lock: asyncio.Lock + managed_process: _ManagedProcess | None = None class BoxRuntime: @@ -34,6 +67,11 @@ class BoxRuntime: async def initialize(self): self._backend = await self._select_backend() + if self._backend is not None: + try: + await self._backend.cleanup_orphaned_containers() + except Exception as exc: + self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}') async def execute(self, spec: BoxSpec) -> BoxExecutionResult: if not spec.cmd: @@ -77,6 +115,40 @@ class BoxRuntime: raise BoxSessionNotFoundError(f'session {session_id} not found') await self._drop_session_locked(session_id) + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> dict: + async with self._lock: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + + async with runtime_session.lock: + existing = runtime_session.managed_process + if existing is not None and existing.is_running: + raise BoxManagedProcessConflictError(f'session {session_id} already has a managed process') + + backend = await self._get_backend() + process = await backend.start_managed_process(runtime_session.info, spec) + managed_process = _ManagedProcess( + spec=spec, + process=process, + started_at=dt.datetime.now(_UTC), + attach_lock=asyncio.Lock(), + stderr_chunks=collections.deque(), + ) + runtime_session.managed_process = managed_process + runtime_session.info.last_used_at = dt.datetime.now(_UTC) + asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, managed_process)) + asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, managed_process)) + return self._managed_process_to_dict(runtime_session.info.session_id, managed_process) + + def get_managed_process(self, session_id: str) -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + if runtime_session.managed_process is None: + raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process') + return self._managed_process_to_dict(session_id, runtime_session.managed_process) + # ── Observability ───────────────────────────────────────────────── async def get_backend_info(self) -> dict: @@ -97,6 +169,11 @@ class BoxRuntime: return { 'backend': backend_info, 'active_sessions': len(self._sessions), + 'managed_processes': sum( + 1 + for runtime_session in self._sessions.values() + if runtime_session.managed_process is not None and runtime_session.managed_process.is_running + ), 'session_ttl_sec': self.session_ttl_sec, } @@ -163,6 +240,7 @@ class BoxRuntime: session_id for session_id, session in self._sessions.items() if session.info.last_used_at < deadline + and not (session.managed_process is not None and session.managed_process.is_running) ] for session_id in expired_session_ids: @@ -173,6 +251,8 @@ class BoxRuntime: if runtime_session is None or self._backend is None: return + await self._terminate_managed_process(runtime_session) + try: self.logger.info( 'LangBot Box session cleanup: ' @@ -198,6 +278,90 @@ class BoxRuntime: f'sandbox_exec session {spec.session_id} already exists with {field}={display}' ) + async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None: + stream = managed_process.process.stderr + if stream is None: + return + + try: + while True: + chunk = await stream.readline() + if not chunk: + break + text = chunk.decode('utf-8', errors='replace').rstrip() + if not text: + continue + managed_process.stderr_chunks.append(text) + preview = '\n'.join(managed_process.stderr_chunks) + while len(preview) > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT and managed_process.stderr_chunks: + managed_process.stderr_chunks.popleft() + preview = '\n'.join(managed_process.stderr_chunks) + self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}') + except Exception as exc: + self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}') + + async def _watch_managed_process(self, session_id: str, managed_process: _ManagedProcess) -> None: + return_code = await managed_process.process.wait() + managed_process.exit_code = return_code + managed_process.exited_at = dt.datetime.now(_UTC) + runtime_session = self._sessions.get(session_id) + if runtime_session is not None: + runtime_session.info.last_used_at = managed_process.exited_at + self.logger.info( + 'LangBot Box managed process exited: ' + f'session_id={session_id} return_code={return_code}' + ) + + async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> None: + managed_process = runtime_session.managed_process + if managed_process is None or not managed_process.is_running: + return + + process = managed_process.process + try: + if process.stdin is not None: + process.stdin.close() + except Exception: + pass + + try: + await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) + except asyncio.TimeoutError: + if process.returncode is None: + try: + process.terminate() + except ProcessLookupError: + pass + try: + await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) + except asyncio.TimeoutError: + if process.returncode is None: + try: + process.kill() + except ProcessLookupError: + pass + await process.wait() + finally: + managed_process.exit_code = process.returncode + managed_process.exited_at = dt.datetime.now(_UTC) + + def _managed_process_to_dict(self, session_id: str, managed_process: _ManagedProcess) -> dict: + stderr_preview = '\n'.join(managed_process.stderr_chunks) + status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED + return BoxManagedProcessInfo( + session_id=session_id, + status=status, + command=managed_process.spec.command, + args=managed_process.spec.args, + cwd=managed_process.spec.cwd, + env_keys=sorted(managed_process.spec.env.keys()), + attached=managed_process.attach_lock.locked(), + started_at=managed_process.started_at, + exited_at=managed_process.exited_at, + exit_code=managed_process.exit_code, + stderr_preview=stderr_preview, + ).model_dump(mode='json') + @staticmethod def _session_to_dict(info: BoxSessionInfo) -> dict: return { diff --git a/src/langbot/pkg/box/security.py b/src/langbot/pkg/box/security.py new file mode 100644 index 00000000..5627510a --- /dev/null +++ b/src/langbot/pkg/box/security.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os + +from .errors import BoxValidationError +from .models import BoxSpec + +BLOCKED_HOST_PATHS = frozenset({ + '/etc', + '/proc', + '/sys', + '/dev', + '/root', + '/boot', + '/run', + '/var/run', + '/run/docker.sock', + '/var/run/docker.sock', + '/run/podman', + '/var/run/podman', +}) + +RESERVED_CONTAINER_PATHS = frozenset({ + '/workspace', + '/tmp', + '/var/tmp', + '/run', +}) + + +def validate_sandbox_security(spec: BoxSpec) -> None: + """Validate that a BoxSpec does not request dangerous container config. + + Raises BoxValidationError when the spec contains a blocked host_path. + """ + if spec.host_path: + real = os.path.realpath(spec.host_path) + for blocked in BLOCKED_HOST_PATHS: + if real == blocked or real.startswith(blocked + '/'): + raise BoxValidationError( + f'host_path {spec.host_path} is blocked for security' + ) diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py index 52907c8e..0b764787 100644 --- a/src/langbot/pkg/box/server.py +++ b/src/langbot/pkg/box/server.py @@ -7,6 +7,8 @@ Usage: from __future__ import annotations import argparse +import asyncio +import datetime as dt import logging import pydantic @@ -15,11 +17,13 @@ from aiohttp import web from .errors import ( BoxBackendUnavailableError, BoxError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError, ) -from .models import BoxExecutionResult, BoxSpec +from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec from .runtime import BoxRuntime logger = logging.getLogger('langbot.box.server') @@ -28,6 +32,8 @@ _ERROR_MAP: dict[type, tuple[int, str]] = { BoxValidationError: (400, 'validation_error'), BoxSessionNotFoundError: (404, 'session_not_found'), BoxSessionConflictError: (409, 'session_conflict'), + BoxManagedProcessNotFoundError: (404, 'managed_process_not_found'), + BoxManagedProcessConflictError: (409, 'managed_process_conflict'), BoxBackendUnavailableError: (503, 'backend_unavailable'), } @@ -129,6 +135,91 @@ async def handle_health(request: web.Request) -> web.Response: return _error_response(exc) +async def handle_start_managed_process(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + try: + body = await request.json() + spec = BoxManagedProcessSpec.model_validate(body) + process_info = await runtime.start_managed_process(session_id, spec) + return web.json_response(process_info, status=201) + except pydantic.ValidationError as exc: + return web.json_response( + {'error': {'code': 'validation_error', 'message': str(exc)}}, + status=400, + ) + except BoxError as exc: + return _error_response(exc) + + +async def handle_get_managed_process(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + try: + return web.json_response(runtime.get_managed_process(session_id)) + except BoxError as exc: + return _error_response(exc) + + +async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + + runtime_session = runtime._sessions.get(session_id) + if runtime_session is None: + return _error_response(BoxSessionNotFoundError(f'session {session_id} not found')) + + managed_process = runtime_session.managed_process + if managed_process is None: + return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process')) + if not managed_process.is_running: + return _error_response(BoxManagedProcessConflictError(f'managed process in session {session_id} is not running')) + + ws = web.WebSocketResponse(protocols=('mcp',)) + await ws.prepare(request) + + async with managed_process.attach_lock: + process = managed_process.process + stdout = process.stdout + stdin = process.stdin + if stdout is None or stdin is None: + await ws.close(message=b'managed process stdio unavailable') + return ws + + async def _stdout_to_ws() -> None: + while True: + line = await stdout.readline() + if not line: + break + await ws.send_str(line.decode('utf-8', errors='replace').rstrip('\n')) + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + + async def _ws_to_stdin() -> None: + async for msg in ws: + if msg.type == web.WSMsgType.TEXT: + stdin.write((msg.data + '\n').encode('utf-8')) + await stdin.drain() + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + elif msg.type in (web.WSMsgType.CLOSE, web.WSMsgType.CLOSING, web.WSMsgType.CLOSED, web.WSMsgType.ERROR): + break + + stdout_task = asyncio.create_task(_stdout_to_ws()) + stdin_task = asyncio.create_task(_ws_to_stdin()) + try: + done, pending = await asyncio.wait( + [stdout_task, stdin_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in done: + task.result() + finally: + await ws.close() + + return ws + + def create_app(runtime: BoxRuntime | None = None) -> web.Application: """Create the aiohttp Application with all routes. @@ -145,6 +236,9 @@ def create_app(runtime: BoxRuntime | None = None) -> web.Application: app.router.add_post('/v1/sessions/{session_id}', handle_create_session) app.router.add_get('/v1/sessions', handle_get_sessions) app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session) + app.router.add_post('/v1/sessions/{session_id}/managed-process', handle_start_managed_process) + app.router.add_get('/v1/sessions/{session_id}/managed-process', handle_get_managed_process) + app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) app.router.add_get('/v1/status', handle_status) app.router.add_get('/v1/health', handle_health) diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 4224521c..26bb72a7 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -12,7 +12,15 @@ import pydantic from .client import BoxRuntimeClient from .connector import BoxRuntimeConnector from .errors import BoxError, BoxValidationError -from .models import BUILTIN_PROFILES, BoxExecutionResult, BoxProfile, BoxSpec, get_box_config +from .models import ( + BUILTIN_PROFILES, + BoxExecutionResult, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxProfile, + BoxSpec, + get_box_config, +) _INT_ADAPTER = pydantic.TypeAdapter(int) _UTC = _dt.timezone.utc @@ -42,32 +50,36 @@ class BoxService: self.profile = self._load_profile() self._recent_errors: collections.deque[dict] = collections.deque(maxlen=_MAX_RECENT_ERRORS) self._shutdown_task = None + self._available = False async def initialize(self): self._ensure_default_host_workspace() - if self._runtime_connector is not None: - await self._runtime_connector.initialize() - return - await self.client.initialize() + try: + if self._runtime_connector is not None: + await self._runtime_connector.initialize() + else: + await self.client.initialize() + self._available = True + except Exception as exc: + self.ap.logger.warning( + f'LangBot Box runtime unavailable, sandbox features disabled: {exc}' + ) + self._available = False + + @property + def available(self) -> bool: + return self._available async def execute_sandbox_tool(self, parameters: dict, query: 'pipeline_query.Query') -> dict: + if not self._available: + raise BoxError('Box runtime is not available. Install and start Podman or Docker to use sandbox features.') spec_payload = dict(parameters) spec_payload.setdefault('session_id', str(query.query_id)) - spec_payload.setdefault('env', {}) - if spec_payload.get('host_path') in (None, '') and self.default_host_workspace is not None: - spec_payload['host_path'] = self.default_host_workspace - - self._apply_profile(spec_payload) - try: - spec = BoxSpec.model_validate(spec_payload) - except pydantic.ValidationError as exc: - first_error = exc.errors()[0] - err = BoxValidationError(first_error.get('msg', 'invalid sandbox_exec arguments')) - self._record_error(err, query) - raise err from exc - - self._validate_host_mount(spec) + spec = self.build_spec(spec_payload) + except BoxError as exc: + self._record_error(exc, query) + raise self.ap.logger.info( 'LangBot Box request: ' f'query_id={query.query_id} ' @@ -102,6 +114,41 @@ class BoxService: async def get_sessions(self) -> list[dict]: return await self.client.get_sessions() + def build_spec(self, spec_payload: dict, skip_host_mount_validation: bool = False) -> BoxSpec: + spec_payload = dict(spec_payload) + spec_payload.setdefault('env', {}) + if spec_payload.get('host_path') in (None, '') and self.default_host_workspace is not None: + spec_payload['host_path'] = self.default_host_workspace + + self._apply_profile(spec_payload) + + try: + spec = BoxSpec.model_validate(spec_payload) + except pydantic.ValidationError as exc: + first_error = exc.errors()[0] + raise BoxValidationError(first_error.get('msg', 'invalid box arguments')) from exc + + if not skip_host_mount_validation: + self._validate_host_mount(spec) + return spec + + async def create_session(self, spec_payload: dict, *, skip_host_mount_validation: bool = False) -> dict: + spec = self.build_spec(spec_payload, skip_host_mount_validation=skip_host_mount_validation) + return await self.client.create_session(spec) + + async def start_managed_process(self, session_id: str, process_payload: dict) -> BoxManagedProcessInfo: + process_spec = BoxManagedProcessSpec.model_validate(process_payload) + return await self.client.start_managed_process(session_id, process_spec) + + async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: + return await self.client.get_managed_process(session_id) + + def get_managed_process_websocket_url(self, session_id: str) -> str: + getter = getattr(self.client, 'get_managed_process_websocket_url', None) + if getter is None: + raise BoxValidationError('box runtime client does not support managed process websocket attach') + return getter(session_id) + def _serialize_result(self, result: BoxExecutionResult) -> dict: stdout, stdout_truncated = self._truncate(result.stdout) stderr, stderr_truncated = self._truncate(result.stderr) @@ -296,9 +343,16 @@ class BoxService: return list(self._recent_errors) async def get_status(self) -> dict: + if not self._available: + return { + 'available': False, + 'profile': self.profile.name, + 'recent_error_count': len(self._recent_errors), + } runtime_status = await self.client.get_status() return { **runtime_status, + 'available': True, 'profile': self.profile.name, 'recent_error_count': len(self._recent_errors), } diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 46d63b84..05cd3e6b 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -1,6 +1,7 @@ from __future__ import annotations import enum +import os import typing from contextlib import AsyncExitStack import traceback @@ -9,11 +10,13 @@ import sqlalchemy import asyncio import httpx +import pydantic import uuid as uuid_module from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from mcp.client.sse import sse_client from mcp.client.streamable_http import streamable_http_client +from mcp.client.websocket import websocket_client from .. import loader from ....core import app @@ -28,6 +31,27 @@ class MCPSessionStatus(enum.Enum): ERROR = 'error' +_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'}) +_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'}) + + +class MCPServerBoxConfig(pydantic.BaseModel): + """Structured configuration for running an MCP server inside a Box container.""" + + image: str | None = None + network: str = 'on' # MCP servers need network for dependency installation + host_path: str | None = None + host_path_mode: str = 'ro' # MCP servers default to read-only mount + env: dict[str, str] = pydantic.Field(default_factory=dict) + startup_timeout_sec: int = 120 # Longer default to allow pip install + cpus: float | None = None + memory_mb: int | None = None + pids_limit: int | None = None + read_only_rootfs: bool | None = None + + model_config = pydantic.ConfigDict(extra='ignore') + + class RuntimeMCPSession: """运行时 MCP 会话""" @@ -75,7 +99,16 @@ class RuntimeMCPSession: self._shutdown_event = asyncio.Event() self._ready_event = asyncio.Event() + # Parse box config once + self.box_config = MCPServerBoxConfig.model_validate( + server_config.get('box', {}) + ) + async def _init_stdio_python_server(self): + if self._uses_box_stdio(): + await self._init_box_stdio_server() + return + server_params = StdioServerParameters( command=self.server_config['command'], args=self.server_config['args'], @@ -90,6 +123,52 @@ class RuntimeMCPSession: await self.session.initialize() + async def _init_box_stdio_server(self): + box_service = self.ap.box_service + session_id = self._build_box_session_id() + host_path = self._resolve_host_path() + session_payload = self._build_box_session_payload(session_id, host_path) + + # MCP server paths are admin-configured, skip host_mount_roots validation + await box_service.create_session( + session_payload, + skip_host_mount_validation=True, + ) + + # Install dependencies inside the container before starting the MCP server + if host_path: + install_cmd = self._detect_install_command(host_path) + if install_cmd: + self.ap.logger.info( + f'MCP server {self.server_name}: installing dependencies in Box ' + f'with: {install_cmd}' + ) + # Build an exec spec that matches the existing session config + # to pass the compatibility check. + exec_payload = dict(session_payload) + exec_payload['cmd'] = install_cmd + exec_payload['timeout_sec'] = self.box_config.startup_timeout_sec or 120 + result = await box_service.client.execute( + box_service.build_spec(exec_payload, skip_host_mount_validation=True) + ) + if not result.ok: + stderr_preview = (result.stderr or '')[:500] + raise Exception( + f'Dependency install failed (exit code {result.exit_code}): ' + f'{stderr_preview}' + ) + + await box_service.start_managed_process( + session_id, + self._build_box_process_payload(host_path), + ) + + websocket_url = box_service.get_managed_process_websocket_url(session_id) + transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) + read_stream, write_stream = transport + self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) + await self.session.initialize() + async def _init_sse_server(self): sse_transport = await self.exit_stack.enter_async_context( sse_client( @@ -124,8 +203,11 @@ class RuntimeMCPSession: await self.session.initialize() + _MAX_RETRIES = 3 + _RETRY_DELAYS = [2, 4, 8] + async def _lifecycle_loop(self): - """在后台任务中管理整个MCP会话的生命周期""" + """Manage the full MCP session lifecycle in a background task.""" try: if self.server_config['mode'] == 'stdio': await self._init_stdio_python_server() @@ -134,49 +216,111 @@ class RuntimeMCPSession: elif self.server_config['mode'] == 'http': await self._init_streamable_http_server() else: - raise ValueError(f'无法识别 MCP 服务器类型: {self.server_name}: {self.server_config}') + raise ValueError(f'Unknown MCP server mode: {self.server_name}: {self.server_config}') await self.refresh() self.status = MCPSessionStatus.CONNECTED - # 通知start()方法连接已建立 + # Notify start() that connection is established self._ready_event.set() - # 等待shutdown信号 - await self._shutdown_event.wait() + # Wait for shutdown signal, with optional health monitoring for Box stdio + if self._uses_box_stdio(): + monitor_task = asyncio.create_task(self._monitor_box_process_health()) + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + done, pending = await asyncio.wait( + [shutdown_task, monitor_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in done: + if task is monitor_task and not self._shutdown_event.is_set(): + raise Exception('Box managed process exited unexpectedly') + else: + await self._shutdown_event.wait() except Exception as e: self.status = MCPSessionStatus.ERROR self.error_message = str(e) self.ap.logger.error(f'Error in MCP session lifecycle {self.server_name}: {e}\n{traceback.format_exc()}') - # 即使出错也要设置ready事件,让start()方法知道初始化已完成 - self._ready_event.set() + # Do NOT set _ready_event here — let _lifecycle_loop_with_retry + # handle retries first. It will set the event when all retries + # are exhausted or on success. + raise # Re-raise so _lifecycle_loop_with_retry can catch it finally: - # 在同一个任务中清理所有资源 + # Clean up all resources in the same task try: if self.exit_stack: await self.exit_stack.aclose() + self.exit_stack = AsyncExitStack() self.functions.clear() self.session = None except Exception as e: self.ap.logger.error(f'Error cleaning up MCP session {self.server_name}: {e}\n{traceback.format_exc()}') + finally: + await self._cleanup_box_stdio_session() + + async def _lifecycle_loop_with_retry(self): + """Wrap _lifecycle_loop with retry and exponential backoff.""" + for attempt in range(self._MAX_RETRIES + 1): + try: + await self._lifecycle_loop() + return # Normal shutdown, don't retry + except Exception as e: + if self._shutdown_event.is_set(): + return # Shutdown requested, don't retry + if attempt >= self._MAX_RETRIES: + self.status = MCPSessionStatus.ERROR + self.error_message = f'Failed after {self._MAX_RETRIES + 1} attempts: {e}' + self._ready_event.set() + return + delay = self._RETRY_DELAYS[attempt] + self.ap.logger.warning( + f'MCP session {self.server_name} failed (attempt {attempt + 1}), ' + f'retrying in {delay}s: {e}' + ) + await self._cleanup_box_stdio_session() + # Reset status for retry + self.status = MCPSessionStatus.CONNECTING + self.error_message = None + await asyncio.sleep(delay) + + async def _monitor_box_process_health(self): + """Poll managed process status; return when process exits.""" + from ...box.models import BoxManagedProcessStatus + + session_id = self._build_box_session_id() + while not self._shutdown_event.is_set(): + try: + info = await self.ap.box_service.client.get_managed_process(session_id) + if isinstance(info, dict): + status = info.get('status', '') + else: + status = getattr(info, 'status', '') + if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED: + return + except Exception: + return # Process or session gone + await asyncio.sleep(5) async def start(self): if not self.enable: return - # 创建后台任务来管理生命周期 - self._lifecycle_task = asyncio.create_task(self._lifecycle_loop()) + # Create background task for lifecycle management with retry + self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry()) - # 等待连接建立或失败(带超时) + # Wait for connection or failure (with timeout) + startup_timeout = self.box_config.startup_timeout_sec if self._uses_box_stdio() else 30.0 try: - await asyncio.wait_for(self._ready_event.wait(), timeout=30.0) + await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout) except asyncio.TimeoutError: self.status = MCPSessionStatus.ERROR - raise Exception('Connection timeout after 30 seconds') + raise Exception(f'Connection timeout after {startup_timeout} seconds') - # 检查是否有错误 + # Check for errors if self.status == MCPSessionStatus.ERROR: raise Exception('Connection failed, please check URL') @@ -232,7 +376,7 @@ class RuntimeMCPSession: return self.functions def get_runtime_info_dict(self) -> dict: - return { + info = { 'status': self.status.value, 'error_message': self.error_message, 'tool_count': len(self.get_tools()), @@ -244,6 +388,10 @@ class RuntimeMCPSession: for tool in self.get_tools() ], } + if self._uses_box_stdio(): + info['box_session_id'] = self._build_box_session_id() + info['box_enabled'] = True + return info async def shutdown(self): """关闭会话并清理资源""" @@ -267,6 +415,177 @@ class RuntimeMCPSession: except Exception as e: self.ap.logger.error(f'Error shutting down MCP session {self.server_name}: {e}\n{traceback.format_exc()}') + def _uses_box_stdio(self) -> bool: + """Check whether this stdio MCP server should run inside a Box container. + + Returns True when mode is stdio AND the Box runtime is available. + An explicit ``box`` key in server_config is NOT required — if the + runtime is reachable, stdio servers default to Box isolation. + """ + if self.server_config.get('mode') != 'stdio': + return False + try: + return getattr(self.ap.box_service, 'available', False) + except Exception: + return False + + def _build_box_session_id(self) -> str: + return f'mcp-{self.server_uuid}' + + def _rewrite_path(self, path: str, host_path: str | None) -> str: + """Rewrite host path prefix to container /workspace prefix.""" + if not host_path or not path: + return path + normalized_host = os.path.realpath(host_path) + if path.startswith(normalized_host + '/'): + return '/workspace' + path[len(normalized_host):] + if path == normalized_host: + return '/workspace' + return path + + def _infer_host_path(self) -> str | None: + """Try to infer host_path from command and args absolute paths. + + Detects virtualenv patterns (e.g. .venv/bin/python) and walks up + to the project root rather than using the bin directory. + """ + candidates = [] + parts = [self.server_config.get('command', '')] + self.server_config.get('args', []) + for part in parts: + if not os.path.isabs(part): + continue + # Use the raw path for venv detection (before resolving symlinks) + # because .venv/bin/python is often a symlink to the system python. + if os.path.exists(part): + directory = os.path.dirname(part) + directory = self._unwrap_venv_path(directory) + candidates.append(os.path.realpath(directory)) + if not candidates: + return None + common = os.path.commonpath(candidates) + return common if common != '/' else None + + @staticmethod + def _unwrap_venv_path(directory: str) -> str: + """If directory looks like a virtualenv bin dir, return the project root. + + Recognized patterns: + /project/.venv/bin -> /project + /project/venv/bin -> /project + /project/.venv/Scripts -> /project (Windows) + /project/env/bin -> /project + """ + parts = directory.replace('\\', '/').split('/') + # Look for patterns like .../(.venv|venv|env)/(bin|Scripts) + for i in range(len(parts) - 1, 0, -1): + if parts[i] in _VENV_BIN_DIRS and i >= 1: + venv_dir = parts[i - 1] + if venv_dir in _VENV_DIRS: + # Return everything before the venv directory + project_root = '/'.join(parts[:i - 1]) + return project_root if project_root else '/' + return directory + + def _resolve_host_path(self) -> str | None: + """Resolve the effective host_path: explicit config > inference.""" + return self.box_config.host_path or self._infer_host_path() + + @staticmethod + def _detect_install_command(host_path: str) -> str | None: + """Detect how to install dependencies from the mounted project. + + Copies the project to a writable temp directory before installing, + because /workspace may be mounted read-only and pip needs to write + build artifacts in the source tree. + """ + _COPY_AND_INSTALL = ( + 'cp -r /workspace /tmp/_mcp_src' + ' && pip install --no-cache-dir /tmp/_mcp_src' + ' && rm -rf /tmp/_mcp_src' + ) + _INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt' + + if os.path.isfile(os.path.join(host_path, 'pyproject.toml')): + return _COPY_AND_INSTALL + if os.path.isfile(os.path.join(host_path, 'setup.py')): + return _COPY_AND_INSTALL + if os.path.isfile(os.path.join(host_path, 'requirements.txt')): + return _INSTALL_REQUIREMENTS + return None + + def _build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict: + bc = self.box_config + if host_path is None: + host_path = self._resolve_host_path() + + payload: dict[str, typing.Any] = { + 'session_id': session_id, + 'workdir': '/workspace', + 'env': bc.env, + # MCP sessions need network for dependency install and writable rootfs + 'network': bc.network, + 'read_only_rootfs': bc.read_only_rootfs if bc.read_only_rootfs is not None else False, + } + if host_path: + payload['host_path'] = host_path + payload['host_path_mode'] = bc.host_path_mode + for key in ('image', 'cpus', 'memory_mb', 'pids_limit'): + val = getattr(bc, key) + if val is not None: + payload[key] = val if not isinstance(val, enum.Enum) else val.value + return payload + + def _build_box_process_payload(self, host_path: str | None = None) -> dict: + if host_path is None: + host_path = self._resolve_host_path() + + command = self.server_config['command'] + args = self.server_config.get('args', []) + cwd = '/workspace' + + if host_path: + # When host_path is resolved, we install deps in-container rather + # than relying on the host venv. Rewrite paths so the container + # sees /workspace/... but replace venv python with plain "python". + command = self._rewrite_venv_command(command, host_path) + args = [self._rewrite_path(a, host_path) for a in args] + cwd = self._rewrite_path(cwd, host_path) + + return { + 'command': command, + 'args': args, + 'env': self.server_config.get('env', {}), + 'cwd': cwd, + } + + def _rewrite_venv_command(self, command: str, host_path: str) -> str: + """Rewrite command: if it points to a venv python, use plain 'python'.""" + if not host_path or not command: + return command + normalized_host = os.path.realpath(host_path) + if not command.startswith(normalized_host + '/'): + return command + # Check if command is a venv python interpreter + rel = command[len(normalized_host) + 1:] # e.g. ".venv/bin/python" + parts = rel.replace('\\', '/').split('/') + # Match patterns like .venv/bin/python*, venv/bin/python*, etc. + if (len(parts) >= 3 + and parts[0] in _VENV_DIRS + and parts[1] in _VENV_BIN_DIRS + and parts[2].startswith('python')): + return 'python' + # Not a venv python — do normal path rewrite + return self._rewrite_path(command, host_path) + + async def _cleanup_box_stdio_session(self) -> None: + if not self._uses_box_stdio(): + return + + try: + await self.ap.box_service.client.delete_session(self._build_box_session_id()) + except Exception as e: + self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {e}') + # @loader.loader_class('mcp') class MCPLoader(loader.ToolLoader): @@ -332,7 +651,7 @@ class MCPLoader(loader.ToolLoader): Args: server_config: 服务器配置字典,必须包含: - name: 服务器名称 - - mode: 连接模式 (stdio/sse) + - mode: 连接模式 (stdio/sse/http) - enable: 是否启用 - extra_args: 额外的配置参数 (可选) """ diff --git a/tests/unit_tests/box/test_box_managed_process.py b/tests/unit_tests/box/test_box_managed_process.py new file mode 100644 index 00000000..d3e7f6cb --- /dev/null +++ b/tests/unit_tests/box/test_box_managed_process.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import asyncio +import datetime as dt +from unittest.mock import Mock + +import pytest + +from langbot.pkg.box.backend import BaseSandboxBackend +from langbot.pkg.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSessionInfo, BoxSpec +from langbot.pkg.box.runtime import BoxRuntime + +_UTC = dt.timezone.utc + + +class FakeManagedProcessBackend(BaseSandboxBackend): + name = 'fake-managed' + + def __init__(self, logger: Mock): + super().__init__(logger) + + async def is_available(self) -> bool: + return True + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + now = dt.datetime.now(_UTC) + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=f'backend-{spec.session_id}', + image=spec.image, + network=spec.network, + host_path=spec.host_path, + host_path_mode=spec.host_path_mode, + cpus=spec.cpus, + memory_mb=spec.memory_mb, + pids_limit=spec.pids_limit, + read_only_rootfs=spec.read_only_rootfs, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec): + raise NotImplementedError + + async def stop_session(self, session: BoxSessionInfo): + return None + + async def start_managed_process(self, session: BoxSessionInfo, spec: BoxManagedProcessSpec) -> asyncio.subprocess.Process: + return await asyncio.create_subprocess_exec( + 'sh', + '-lc', + 'cat', + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + +@pytest.mark.asyncio +async def test_runtime_start_managed_process_tracks_status(): + logger = Mock() + runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=300) + await runtime.initialize() + + session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'}) + await runtime.create_session(session_spec) + + process_info = await runtime.start_managed_process( + 'mcp-session', + BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'), + ) + + assert process_info['session_id'] == 'mcp-session' + assert process_info['status'] == BoxManagedProcessStatus.RUNNING.value + assert process_info['command'] == 'python' + assert process_info['args'] == ['-m', 'demo'] + + queried = runtime.get_managed_process('mcp-session') + assert queried['status'] == BoxManagedProcessStatus.RUNNING.value + + await runtime.shutdown() + + +@pytest.mark.asyncio +async def test_runtime_does_not_reap_session_with_running_managed_process(): + logger = Mock() + runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=1) + await runtime.initialize() + + session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'}) + await runtime.create_session(session_spec) + await runtime.start_managed_process( + 'mcp-session', + BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'), + ) + + runtime._sessions['mcp-session'].info.last_used_at = dt.datetime.now(_UTC) - dt.timedelta(seconds=120) + await runtime._reap_expired_sessions_locked() + + assert 'mcp-session' in runtime._sessions + + await runtime.shutdown() diff --git a/tests/unit_tests/box/test_box_security.py b/tests/unit_tests/box/test_box_security.py new file mode 100644 index 00000000..bc7cc48e --- /dev/null +++ b/tests/unit_tests/box/test_box_security.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import pytest + +from langbot.pkg.box.errors import BoxValidationError +from langbot.pkg.box.models import BoxHostMountMode, BoxNetworkMode, BoxSpec +from langbot.pkg.box.security import BLOCKED_HOST_PATHS, validate_sandbox_security + + +def _make_spec(**overrides) -> BoxSpec: + defaults = { + 'session_id': 'test-session', + 'cmd': 'echo hi', + 'image': 'python:3.11-slim', + } + defaults.update(overrides) + return BoxSpec(**defaults) + + +class TestValidateSandboxSecurity: + def test_no_host_path_passes(self): + spec = _make_spec(host_path=None) + validate_sandbox_security(spec) # should not raise + + def test_safe_host_path_passes(self): + spec = _make_spec(host_path='/home/user/my-project') + validate_sandbox_security(spec) # should not raise + + @pytest.mark.parametrize('blocked', [ + '/etc', + '/proc', + '/sys', + '/dev', + '/root', + '/boot', + '/run', + '/var/run', + '/run/docker.sock', + '/var/run/docker.sock', + '/run/podman', + '/var/run/podman', + ]) + def test_blocked_paths_rejected(self, blocked): + spec = _make_spec(host_path=blocked) + with pytest.raises(BoxValidationError, match='blocked for security'): + validate_sandbox_security(spec) + + def test_blocked_subpath_rejected(self): + spec = _make_spec(host_path='/etc/nginx') + with pytest.raises(BoxValidationError, match='blocked for security'): + validate_sandbox_security(spec) + + def test_path_starting_with_blocked_prefix_but_different_dir_passes(self): + # /etcetera is NOT /etc + spec = _make_spec(host_path='/etcetera/data') + validate_sandbox_security(spec) # should not raise + + def test_blocked_host_paths_is_frozenset(self): + assert isinstance(BLOCKED_HOST_PATHS, frozenset) diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index c4ce9f5c..5653d927 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -19,6 +19,7 @@ from langbot.pkg.box.models import ( BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, + BoxManagedProcessSpec, BoxNetworkMode, BoxProfile, BoxSessionInfo, @@ -60,6 +61,12 @@ class _InProcessBoxRuntimeClient(BoxRuntimeClient): async def create_session(self, spec): return await self._runtime.create_session(spec) + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec): + return await self._runtime.start_managed_process(session_id, spec) + + async def get_managed_process(self, session_id: str): + return self._runtime.get_managed_process(session_id) + def _can_open_test_socket() -> bool: try: @@ -1191,3 +1198,46 @@ async def test_remote_client_exec_raises_conflict_error(): await client.shutdown() finally: await server.close() + + +# ── BoxHostMountMode.NONE tests ───────────────────────────────────── + + +class TestBoxHostMountModeNone: + def test_none_mode_is_valid_enum(self): + assert BoxHostMountMode.NONE.value == 'none' + + def test_spec_with_none_mode_skips_workdir_check(self): + """When host_path_mode is NONE, workdir validation is skipped.""" + spec = BoxSpec( + session_id='test', + cmd='echo hi', + host_path='/home/user/data', + host_path_mode=BoxHostMountMode.NONE, + workdir='/opt/custom', # Not under /workspace, should be allowed + ) + assert spec.host_path_mode == BoxHostMountMode.NONE + assert spec.workdir == '/opt/custom' + + def test_spec_with_rw_mode_requires_workspace_workdir(self): + """When host_path_mode is RW, workdir must be under /workspace.""" + with pytest.raises(Exception): + BoxSpec( + session_id='test', + cmd='echo hi', + host_path='/home/user/data', + host_path_mode=BoxHostMountMode.READ_WRITE, + workdir='/opt/custom', + ) + + def test_spec_with_ro_mode_requires_workspace_workdir(self): + """When host_path_mode is RO, workdir must be under /workspace.""" + with pytest.raises(Exception): + BoxSpec( + session_id='test', + cmd='echo hi', + host_path='/home/user/data', + host_path_mode=BoxHostMountMode.READ_ONLY, + workdir='/opt/custom', + ) + diff --git a/tests/unit_tests/provider/test_mcp_box_integration.py b/tests/unit_tests/provider/test_mcp_box_integration.py new file mode 100644 index 00000000..83617474 --- /dev/null +++ b/tests/unit_tests/provider/test_mcp_box_integration.py @@ -0,0 +1,421 @@ +"""Tests for MCP Box integration: path rewriting, host_path inference, config model, payloads. + +Uses importlib.util.spec_from_file_location to load mcp.py directly without +triggering the circular import chain through the app module. +""" +from __future__ import annotations + +import importlib +import importlib.util +import os +import sys +import tempfile +import types +from unittest.mock import Mock + +import pytest + + +# --------------------------------------------------------------------------- +# Load mcp.py directly from file path, with stub dependencies +# --------------------------------------------------------------------------- + +def _stub_module(fqn: str, attrs: dict | None = None, is_package: bool = False): + """Create or return a stub module and register it in sys.modules.""" + if fqn in sys.modules: + mod = sys.modules[fqn] + else: + mod = types.ModuleType(fqn) + mod.__spec__ = importlib.machinery.ModuleSpec(fqn, None, is_package=is_package) + if is_package: + mod.__path__ = [] + sys.modules[fqn] = mod + parts = fqn.rsplit('.', 1) + if len(parts) == 2 and parts[0] in sys.modules: + setattr(sys.modules[parts[0]], parts[1], mod) + if attrs: + for k, v in attrs.items(): + setattr(mod, k, v) + return mod + + +@pytest.fixture(scope='module', autouse=True) +def mcp_module(): + """Load mcp.py with minimal stubs to avoid circular imports.""" + saved = {} + + def _save_and_stub(name, attrs=None, is_package=False): + saved[name] = sys.modules.get(name) + # Don't overwrite modules that already exist (from other test modules) + if name in sys.modules: + return + _stub_module(name, attrs, is_package) + + # Stub entire dependency chains as packages / modules + _save_and_stub('langbot_plugin', is_package=True) + _save_and_stub('langbot_plugin.api', is_package=True) + _save_and_stub('langbot_plugin.api.entities', is_package=True) + _save_and_stub('langbot_plugin.api.entities.events', is_package=True) + _save_and_stub('langbot_plugin.api.entities.events.pipeline_query', {}) + _save_and_stub('langbot_plugin.api.entities.builtin', is_package=True) + _save_and_stub('langbot_plugin.api.entities.builtin.resource', is_package=True) + _save_and_stub('langbot_plugin.api.entities.builtin.resource.tool', { + 'LLMTool': type('LLMTool', (), {}), + }) + _save_and_stub('langbot_plugin.api.entities.builtin.provider', is_package=True) + _save_and_stub('langbot_plugin.api.entities.builtin.provider.message', {}) + _save_and_stub('sqlalchemy', {'select': Mock()}) + _save_and_stub('httpx', {'AsyncClient': Mock()}) + _save_and_stub('mcp', {'ClientSession': Mock, 'StdioServerParameters': Mock}, is_package=True) + _save_and_stub('mcp.client', is_package=True) + _save_and_stub('mcp.client.stdio', {'stdio_client': Mock()}) + _save_and_stub('mcp.client.sse', {'sse_client': Mock()}) + _save_and_stub('mcp.client.streamable_http', {'streamable_http_client': Mock()}) + _save_and_stub('mcp.client.websocket', {'websocket_client': Mock()}) + + # Stub the provider.tools.loader (source of circular import) + _save_and_stub('langbot', is_package=True) + _save_and_stub('langbot.pkg', is_package=True) + _save_and_stub('langbot.pkg.provider', is_package=True) + _save_and_stub('langbot.pkg.provider.tools', is_package=True) + _save_and_stub('langbot.pkg.provider.tools.loader', { + 'ToolLoader': type('ToolLoader', (), {'__init__': lambda self, ap: None}), + }) + _save_and_stub('langbot.pkg.provider.tools.loaders', is_package=True) + _save_and_stub('langbot.pkg.core', is_package=True) + _save_and_stub('langbot.pkg.core.app', {'Application': type('Application', (), {})}) + _save_and_stub('langbot.pkg.entity', is_package=True) + _save_and_stub('langbot.pkg.entity.persistence', is_package=True) + _save_and_stub('langbot.pkg.entity.persistence.mcp', {}) + + # box models + import enum as _enum + class _BPS(str, _enum.Enum): + RUNNING = 'running' + EXITED = 'exited' + _save_and_stub('langbot.pkg.box', is_package=True) + _save_and_stub('langbot.pkg.box.models', {'BoxManagedProcessStatus': _BPS}) + + # Now load mcp.py via spec_from_file_location + mod_fqn = 'langbot.pkg.provider.tools.loaders.mcp' + sys.modules.pop(mod_fqn, None) + mcp_path = os.path.join( + os.path.dirname(__file__), '..', '..', '..', + 'src', 'langbot', 'pkg', 'provider', 'tools', 'loaders', 'mcp.py', + ) + mcp_path = os.path.normpath(mcp_path) + spec = importlib.util.spec_from_file_location(mod_fqn, mcp_path) + mod = importlib.util.module_from_spec(spec) + sys.modules[mod_fqn] = mod + spec.loader.exec_module(mod) + + yield mod + + # Cleanup + sys.modules.pop(mod_fqn, None) + for name in reversed(list(saved)): + if saved[name] is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = saved[name] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_ap(): + ap = Mock() + ap.logger = Mock() + ap.box_service = Mock() + return ap + + +def _make_session(mcp_module, server_config: dict, ap=None): + if ap is None: + ap = _make_ap() + return mcp_module.RuntimeMCPSession( + server_name=server_config.get('name', 'test-server'), + server_config=server_config, + enable=True, + ap=ap, + ) + + +# ── MCPServerBoxConfig ────────────────────────────────────────────── + + +class TestMCPServerBoxConfig: + def test_default_values(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate({}) + assert cfg.image is None + assert cfg.network == 'on' + assert cfg.host_path is None + assert cfg.host_path_mode == 'ro' + assert cfg.env == {} + assert cfg.startup_timeout_sec == 120 + assert cfg.cpus is None + assert cfg.memory_mb is None + assert cfg.pids_limit is None + assert cfg.read_only_rootfs is None + + def test_custom_values(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate({ + 'image': 'node:20', + 'network': 'on', + 'host_path': '/home/user/mcp', + 'host_path_mode': 'rw', + 'env': {'FOO': 'bar'}, + 'startup_timeout_sec': 60, + 'cpus': 2.0, + 'memory_mb': 1024, + 'pids_limit': 256, + 'read_only_rootfs': False, + }) + assert cfg.image == 'node:20' + assert cfg.network == 'on' + assert cfg.cpus == 2.0 + assert cfg.memory_mb == 1024 + + def test_extra_fields_ignored(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate({ + 'image': 'node:20', + 'unknown_field': 'whatever', + }) + assert cfg.image == 'node:20' + assert not hasattr(cfg, 'unknown_field') + + +# ── Path Rewriting ────────────────────────────────────────────────── + + +class TestRewritePath: + def test_no_host_path_returns_unchanged(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + assert s._rewrite_path('/some/path', None) == '/some/path' + + def test_empty_path_returns_empty(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + assert s._rewrite_path('', '/home/user/mcp') == '' + + def test_prefix_match_rewrites(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + result = s._rewrite_path('/home/user/mcp/server.py', '/home/user/mcp') + assert result == '/workspace/server.py' + + def test_exact_match_rewrites_to_workspace(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + result = s._rewrite_path('/home/user/mcp', '/home/user/mcp') + assert result == '/workspace' + + def test_non_matching_path_unchanged(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + result = s._rewrite_path('/opt/other/server.py', '/home/user/mcp') + assert result == '/opt/other/server.py' + + def test_similar_prefix_not_rewritten(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + result = s._rewrite_path('/home/user/mcp-other/file.py', '/home/user/mcp') + assert result == '/home/user/mcp-other/file.py' + + def test_nested_subpath_rewrites(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + result = s._rewrite_path('/home/user/mcp/src/lib/main.py', '/home/user/mcp') + assert result == '/workspace/src/lib/main.py' + + +# ── host_path Inference ───────────────────────────────────────────── + + +class TestInferHostPath: + def test_no_absolute_paths_returns_none(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': ['server.py'], + }) + assert s._infer_host_path() is None + + def test_nonexistent_path_returns_none(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': '/nonexistent/path/to/python', 'args': [], + }) + assert s._infer_host_path() is None + + def test_existing_absolute_path_infers_directory(self, mcp_module): + with tempfile.NamedTemporaryFile(suffix='.py') as f: + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [f.name], + }) + result = s._infer_host_path() + assert result is not None + assert result == os.path.dirname(os.path.realpath(f.name)) + + +# ── Build Box Session Payload ─────────────────────────────────────── + + +class TestBuildBoxSessionPayload: + def test_minimal_config(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + payload = s._build_box_session_payload('session-123') + assert payload['session_id'] == 'session-123' + assert payload['workdir'] == '/workspace' + assert payload['env'] == {} + assert 'host_path' not in payload + + def test_with_host_path(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + 'box': {'host_path': '/home/user/mcp', 'host_path_mode': 'ro'}, + }) + payload = s._build_box_session_payload('session-123') + assert payload['host_path'] == '/home/user/mcp' + assert payload['host_path_mode'] == 'ro' + + def test_optional_fields_included_when_set(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + 'box': {'image': 'node:20', 'cpus': 2.0, 'memory_mb': 1024, 'pids_limit': 256}, + }) + payload = s._build_box_session_payload('session-123') + assert payload['image'] == 'node:20' + assert payload['cpus'] == 2.0 + assert payload['memory_mb'] == 1024 + assert payload['pids_limit'] == 256 + + def test_none_fields_excluded(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + payload = s._build_box_session_payload('session-123') + assert 'image' not in payload + assert 'cpus' not in payload + + +# ── Build Box Process Payload ─────────────────────────────────────── + + +class TestBuildBoxProcessPayload: + def test_basic_payload(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': ['server.py'], 'env': {'KEY': 'val'}, + }) + payload = s._build_box_process_payload() + assert payload['command'] == 'python' + assert payload['args'] == ['server.py'] + assert payload['env'] == {'KEY': 'val'} + assert payload['cwd'] == '/workspace' + + def test_path_rewriting_applied(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': '/home/user/mcp/venv/bin/python', + 'args': ['/home/user/mcp/server.py', '--config', '/home/user/mcp/config.json'], + 'env': {}, + 'box': {'host_path': '/home/user/mcp'}, + }) + payload = s._build_box_process_payload() + # venv python is replaced with plain 'python' (deps installed in-container) + assert payload['command'] == 'python' + assert payload['args'] == ['/workspace/server.py', '--config', '/workspace/config.json'] + + def test_non_matching_args_not_rewritten(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', + 'args': ['/opt/other/server.py', '--flag'], + 'env': {}, + 'box': {'host_path': '/home/user/mcp'}, + }) + payload = s._build_box_process_payload() + assert payload['command'] == 'python' + assert payload['args'] == ['/opt/other/server.py', '--flag'] + + +# ── get_runtime_info_dict ─────────────────────────────────────────── + + +class TestGetRuntimeInfoDict: + def test_non_stdio_session(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'test-uuid', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + info = s.get_runtime_info_dict() + assert info['status'] == 'connecting' + assert 'box_session_id' not in info + + def test_stdio_session_includes_box_info(self, mcp_module): + ap = _make_ap() + ap.box_service.available = True + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'test-uuid', 'mode': 'stdio', + 'command': 'python', 'args': [], + }, ap=ap) + info = s.get_runtime_info_dict() + assert info['box_session_id'] == 'mcp-test-uuid' + assert info['box_enabled'] is True + + def test_stdio_session_without_box_runtime(self, mcp_module): + ap = _make_ap() + ap.box_service.available = False + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'test-uuid', 'mode': 'stdio', + 'command': 'python', 'args': [], + }, ap=ap) + info = s.get_runtime_info_dict() + assert 'box_session_id' not in info + + +# ── Box config parsing ────────────────────────────────────────────── + + +class TestBoxConfigParsing: + def test_box_config_parsed_from_server_config(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + 'box': {'image': 'node:20', 'host_path': '/home/user/mcp'}, + }) + assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) + assert s.box_config.image == 'node:20' + assert s.box_config.host_path == '/home/user/mcp' + + def test_missing_box_key_uses_defaults(self, mcp_module): + s = _make_session(mcp_module, { + 'name': 'test', 'uuid': 'u1', 'mode': 'sse', + 'command': 'python', 'args': [], + }) + assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) + assert s.box_config.image is None + assert s.box_config.host_path_mode == 'ro'