From fd68c160563ee5cd86bead632f4c5e28773a8a17 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Wed, 8 Apr 2026 16:24:55 +0800 Subject: [PATCH] feat(sandbox): add MCP box integration on top of sandbox base (#2083) --- src/langbot/pkg/provider/tools/loaders/mcp.py | 407 ++++++++++- .../box/test_box_mcp_integration.py | 361 ++++++++++ .../provider/test_mcp_box_integration.py | 635 ++++++++++++++++++ 3 files changed, 1385 insertions(+), 18 deletions(-) create mode 100644 tests/integration_tests/box/test_box_mcp_integration.py create mode 100644 tests/unit_tests/provider/test_mcp_box_integration.py diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 46d63b84..8a3bbdc4 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -1,6 +1,7 @@ from __future__ import annotations import enum +import os import typing from contextlib import AsyncExitStack import traceback @@ -9,11 +10,13 @@ import sqlalchemy import asyncio import httpx +import pydantic import uuid as uuid_module from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from mcp.client.sse import sse_client from mcp.client.streamable_http import streamable_http_client +from mcp.client.websocket import websocket_client from .. import loader from ....core import app @@ -28,6 +31,39 @@ class MCPSessionStatus(enum.Enum): ERROR = 'error' +class MCPSessionErrorPhase(enum.Enum): + """Which phase of the MCP lifecycle failed.""" + + SESSION_CREATE = 'session_create' + DEP_INSTALL = 'dep_install' + PROCESS_START = 'process_start' + RELAY_CONNECT = 'relay_connect' + MCP_INIT = 'mcp_init' + RUNTIME = 'runtime' + TOOL_CALL = 'tool_call' + + +_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'}) +_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'}) + + +class MCPServerBoxConfig(pydantic.BaseModel): + """Structured configuration for running an MCP server inside a Box container.""" + + image: str | None = None + network: str = 'on' # MCP servers need network for dependency installation + host_path: str | None = None + host_path_mode: str = 'ro' # MCP servers default to read-only mount + env: dict[str, str] = pydantic.Field(default_factory=dict) + startup_timeout_sec: int = 120 # Longer default to allow pip install + cpus: float | None = None + memory_mb: int | None = None + pids_limit: int | None = None + read_only_rootfs: bool | None = None + + model_config = pydantic.ConfigDict(extra='ignore') + + class RuntimeMCPSession: """运行时 MCP 会话""" @@ -58,6 +94,10 @@ class RuntimeMCPSession: error_message: str | None = None + error_phase: MCPSessionErrorPhase | None = None + + retry_count: int = 0 + def __init__(self, server_name: str, server_config: dict, enable: bool, ap: app.Application): self.server_name = server_name self.server_uuid = server_config.get('uuid', '') @@ -75,7 +115,14 @@ class RuntimeMCPSession: self._shutdown_event = asyncio.Event() self._ready_event = asyncio.Event() + # Parse box config once + self.box_config = MCPServerBoxConfig.model_validate(server_config.get('box', {})) + async def _init_stdio_python_server(self): + if self._uses_box_stdio(): + await self._init_box_stdio_server() + return + server_params = StdioServerParameters( command=self.server_config['command'], args=self.server_config['args'], @@ -90,6 +137,68 @@ class RuntimeMCPSession: await self.session.initialize() + async def _init_box_stdio_server(self): + box_service = self.ap.box_service + session_id = self._build_box_session_id() + host_path = self._resolve_host_path() + session_payload = self._build_box_session_payload(session_id, host_path) + + # Phase: session creation + try: + await box_service.create_session( + session_payload, + ) + except Exception: + self.error_phase = MCPSessionErrorPhase.SESSION_CREATE + raise + + # Phase: dependency installation + if host_path: + install_cmd = self._detect_install_command(host_path) + if install_cmd: + self.ap.logger.info( + f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}' + ) + exec_payload = dict(session_payload) + exec_payload['cmd'] = install_cmd + exec_payload['timeout_sec'] = self.box_config.startup_timeout_sec or 120 + try: + result = await box_service.client.execute(box_service.build_spec(exec_payload)) + except Exception: + self.error_phase = MCPSessionErrorPhase.DEP_INSTALL + raise + if not result.ok: + self.error_phase = MCPSessionErrorPhase.DEP_INSTALL + stderr_preview = (result.stderr or '')[:500] + raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}') + + # Phase: managed process start + try: + await box_service.start_managed_process( + session_id, + self._build_box_process_payload(host_path), + ) + except Exception: + self.error_phase = MCPSessionErrorPhase.PROCESS_START + raise + + # Phase: WebSocket relay connection + try: + websocket_url = box_service.get_managed_process_websocket_url(session_id) + transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) + read_stream, write_stream = transport + self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) + except Exception: + self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT + raise + + # Phase: MCP protocol initialization + try: + await self.session.initialize() + except Exception: + self.error_phase = MCPSessionErrorPhase.MCP_INIT + raise + async def _init_sse_server(self): sse_transport = await self.exit_stack.enter_async_context( sse_client( @@ -124,8 +233,11 @@ class RuntimeMCPSession: await self.session.initialize() + _MAX_RETRIES = 3 + _RETRY_DELAYS = [2, 4, 8] + async def _lifecycle_loop(self): - """在后台任务中管理整个MCP会话的生命周期""" + """Manage the full MCP session lifecycle in a background task.""" try: if self.server_config['mode'] == 'stdio': await self._init_stdio_python_server() @@ -134,49 +246,125 @@ class RuntimeMCPSession: elif self.server_config['mode'] == 'http': await self._init_streamable_http_server() else: - raise ValueError(f'无法识别 MCP 服务器类型: {self.server_name}: {self.server_config}') + raise ValueError(f'Unknown MCP server mode: {self.server_name}: {self.server_config}') await self.refresh() self.status = MCPSessionStatus.CONNECTED - # 通知start()方法连接已建立 + # Notify start() that connection is established self._ready_event.set() - # 等待shutdown信号 - await self._shutdown_event.wait() + # Wait for shutdown signal, with optional health monitoring for Box stdio + if self._uses_box_stdio(): + monitor_task = asyncio.create_task(self._monitor_box_process_health()) + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + done, pending = await asyncio.wait( + [shutdown_task, monitor_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in done: + if task is monitor_task and not self._shutdown_event.is_set(): + self.error_phase = MCPSessionErrorPhase.RUNTIME + raise Exception('Box managed process exited unexpectedly') + else: + await self._shutdown_event.wait() except Exception as e: self.status = MCPSessionStatus.ERROR self.error_message = str(e) self.ap.logger.error(f'Error in MCP session lifecycle {self.server_name}: {e}\n{traceback.format_exc()}') - # 即使出错也要设置ready事件,让start()方法知道初始化已完成 - self._ready_event.set() + # Do NOT set _ready_event here — let _lifecycle_loop_with_retry + # handle retries first. It will set the event when all retries + # are exhausted or on success. + raise # Re-raise so _lifecycle_loop_with_retry can catch it finally: - # 在同一个任务中清理所有资源 + # Clean up all resources in the same task try: if self.exit_stack: await self.exit_stack.aclose() + self.exit_stack = AsyncExitStack() self.functions.clear() self.session = None except Exception as e: self.ap.logger.error(f'Error cleaning up MCP session {self.server_name}: {e}\n{traceback.format_exc()}') + finally: + await self._cleanup_box_stdio_session() + + async def _lifecycle_loop_with_retry(self): + """Wrap _lifecycle_loop with retry and exponential backoff.""" + for attempt in range(self._MAX_RETRIES + 1): + try: + await self._lifecycle_loop() + return # Normal shutdown, don't retry + except Exception as e: + self.retry_count = attempt + 1 + if self._shutdown_event.is_set(): + return # Shutdown requested, don't retry + if attempt >= self._MAX_RETRIES: + self.status = MCPSessionStatus.ERROR + self.error_message = f'Failed after {self._MAX_RETRIES + 1} attempts: {e}' + self._ready_event.set() + return + delay = self._RETRY_DELAYS[attempt] + self.ap.logger.warning( + f'MCP session {self.server_name} failed (attempt {attempt + 1}), retrying in {delay}s: {e}' + ) + await self._cleanup_box_stdio_session() + # Reset status for retry + self.status = MCPSessionStatus.CONNECTING + self.error_message = None + self.error_phase = None + await asyncio.sleep(delay) + + _MONITOR_POLL_INTERVAL = 5 + _MONITOR_MAX_CONSECUTIVE_ERRORS = 3 + + async def _monitor_box_process_health(self): + """Poll managed process status; return when process exits.""" + from langbot_plugin.box.models import BoxManagedProcessStatus + + session_id = self._build_box_session_id() + consecutive_errors = 0 + while not self._shutdown_event.is_set(): + try: + info = await self.ap.box_service.client.get_managed_process(session_id) + if isinstance(info, dict): + status = info.get('status', '') + else: + status = getattr(info, 'status', '') + if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED: + return + consecutive_errors = 0 + except Exception as exc: + consecutive_errors += 1 + self.ap.logger.warning( + f'MCP monitor for {self.server_name}: get_managed_process failed ' + f'({consecutive_errors}/{self._MONITOR_MAX_CONSECUTIVE_ERRORS}): ' + f'{type(exc).__name__}: {exc}' + ) + if consecutive_errors >= self._MONITOR_MAX_CONSECUTIVE_ERRORS: + return + await asyncio.sleep(self._MONITOR_POLL_INTERVAL) async def start(self): if not self.enable: return - # 创建后台任务来管理生命周期 - self._lifecycle_task = asyncio.create_task(self._lifecycle_loop()) + # Create background task for lifecycle management with retry + self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry()) - # 等待连接建立或失败(带超时) + # Wait for connection or failure (with timeout) + startup_timeout = self.box_config.startup_timeout_sec if self._uses_box_stdio() else 30.0 try: - await asyncio.wait_for(self._ready_event.wait(), timeout=30.0) + await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout) except asyncio.TimeoutError: self.status = MCPSessionStatus.ERROR - raise Exception('Connection timeout after 30 seconds') + raise Exception(f'Connection timeout after {startup_timeout} seconds') - # 检查是否有错误 + # Check for errors if self.status == MCPSessionStatus.ERROR: raise Exception('Connection failed, please check URL') @@ -232,9 +420,11 @@ class RuntimeMCPSession: return self.functions def get_runtime_info_dict(self) -> dict: - return { + info = { 'status': self.status.value, 'error_message': self.error_message, + 'error_phase': self.error_phase.value if self.error_phase else None, + 'retry_count': self.retry_count, 'tool_count': len(self.get_tools()), 'tools': [ { @@ -244,6 +434,10 @@ class RuntimeMCPSession: for tool in self.get_tools() ], } + if self._uses_box_stdio(): + info['box_session_id'] = self._build_box_session_id() + info['box_enabled'] = True + return info async def shutdown(self): """关闭会话并清理资源""" @@ -267,6 +461,182 @@ class RuntimeMCPSession: except Exception as e: self.ap.logger.error(f'Error shutting down MCP session {self.server_name}: {e}\n{traceback.format_exc()}') + def _uses_box_stdio(self) -> bool: + """Check whether this stdio MCP server should run inside a Box container. + + Returns True when mode is stdio AND the Box runtime is available. + An explicit ``box`` key in server_config is NOT required — if the + runtime is reachable, stdio servers default to Box isolation. + """ + if self.server_config.get('mode') != 'stdio': + return False + try: + return getattr(self.ap.box_service, 'available', False) + except Exception: + return False + + def _build_box_session_id(self) -> str: + return f'mcp-{self.server_uuid}' + + def _rewrite_path(self, path: str, host_path: str | None) -> str: + """Rewrite host path prefix to container /workspace prefix.""" + if not host_path or not path: + return path + normalized_host = os.path.realpath(host_path) + if path.startswith(normalized_host + '/'): + return '/workspace' + path[len(normalized_host) :] + if path == normalized_host: + return '/workspace' + return path + + def _infer_host_path(self) -> str | None: + """Try to infer host_path from command and args absolute paths. + + Detects virtualenv patterns (e.g. .venv/bin/python) and walks up + to the project root rather than using the bin directory. + """ + candidates = [] + parts = [self.server_config.get('command', '')] + self.server_config.get('args', []) + for part in parts: + if not os.path.isabs(part): + continue + # Use the raw path for venv detection (before resolving symlinks) + # because .venv/bin/python is often a symlink to the system python. + if os.path.exists(part): + directory = os.path.dirname(part) + directory = self._unwrap_venv_path(directory) + candidates.append(os.path.realpath(directory)) + if not candidates: + return None + common = os.path.commonpath(candidates) + return common if common != '/' else None + + @staticmethod + def _unwrap_venv_path(directory: str) -> str: + """If directory looks like a virtualenv bin dir, return the project root. + + Recognized patterns: + /project/.venv/bin -> /project + /project/venv/bin -> /project + /project/.venv/Scripts -> /project (Windows) + /project/env/bin -> /project + """ + parts = directory.replace('\\', '/').split('/') + # Look for patterns like .../(.venv|venv|env)/(bin|Scripts) + for i in range(len(parts) - 1, 0, -1): + if parts[i] in _VENV_BIN_DIRS and i >= 1: + venv_dir = parts[i - 1] + if venv_dir in _VENV_DIRS: + # Return everything before the venv directory + project_root = '/'.join(parts[: i - 1]) + return project_root if project_root else '/' + return directory + + def _resolve_host_path(self) -> str | None: + """Resolve the effective host_path: explicit config > inference.""" + return self.box_config.host_path or self._infer_host_path() + + @staticmethod + def _detect_install_command(host_path: str) -> str | None: + """Detect how to install dependencies from the mounted project. + + Copies the project to a writable temp directory before installing, + because /workspace may be mounted read-only and pip needs to write + build artifacts in the source tree. + """ + # Use /opt instead of /tmp — /tmp is often a small tmpfs (64 MB) + # and cannot hold the copied source tree plus pip build artifacts. + _COPY_AND_INSTALL = ( + 'mkdir -p /opt/_mcp_src' + ' && tar -C /workspace' + ' --exclude=.venv --exclude=.git --exclude=__pycache__' + ' --exclude=node_modules --exclude=.tox --exclude=.nox' + ' --exclude="*.egg-info" --exclude=.uv-cache' + ' -cf - .' + ' | tar -C /opt/_mcp_src -xf -' + ' && pip install --no-cache-dir /opt/_mcp_src' + ' && rm -rf /opt/_mcp_src' + ) + _INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt' + + if os.path.isfile(os.path.join(host_path, 'pyproject.toml')): + return _COPY_AND_INSTALL + if os.path.isfile(os.path.join(host_path, 'setup.py')): + return _COPY_AND_INSTALL + if os.path.isfile(os.path.join(host_path, 'requirements.txt')): + return _INSTALL_REQUIREMENTS + return None + + def _build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict: + bc = self.box_config + if host_path is None: + host_path = self._resolve_host_path() + + payload: dict[str, typing.Any] = { + 'session_id': session_id, + 'workdir': '/workspace', + 'env': bc.env, + # MCP sessions need network for dependency install and writable rootfs + 'network': bc.network, + 'read_only_rootfs': bc.read_only_rootfs if bc.read_only_rootfs is not None else False, + } + if host_path: + payload['host_path'] = host_path + payload['host_path_mode'] = bc.host_path_mode + for key in ('image', 'cpus', 'memory_mb', 'pids_limit'): + val = getattr(bc, key) + if val is not None: + payload[key] = val if not isinstance(val, enum.Enum) else val.value + return payload + + def _build_box_process_payload(self, host_path: str | None = None) -> dict: + if host_path is None: + host_path = self._resolve_host_path() + + command = self.server_config['command'] + args = self.server_config.get('args', []) + cwd = '/workspace' + + if host_path: + # When host_path is resolved, we install deps in-container rather + # than relying on the host venv. Rewrite paths so the container + # sees /workspace/... but replace venv python with plain "python". + command = self._rewrite_venv_command(command, host_path) + args = [self._rewrite_path(a, host_path) for a in args] + cwd = self._rewrite_path(cwd, host_path) + + return { + 'command': command, + 'args': args, + 'env': self.server_config.get('env', {}), + 'cwd': cwd, + } + + def _rewrite_venv_command(self, command: str, host_path: str) -> str: + """Rewrite command: if it points to a venv python, use plain 'python'.""" + if not host_path or not command: + return command + normalized_host = os.path.realpath(host_path) + if not command.startswith(normalized_host + '/'): + return command + # Check if command is a venv python interpreter + rel = command[len(normalized_host) + 1 :] # e.g. ".venv/bin/python" + parts = rel.replace('\\', '/').split('/') + # Match patterns like .venv/bin/python*, venv/bin/python*, etc. + if len(parts) >= 3 and parts[0] in _VENV_DIRS and parts[1] in _VENV_BIN_DIRS and parts[2].startswith('python'): + return 'python' + # Not a venv python — do normal path rewrite + return self._rewrite_path(command, host_path) + + async def _cleanup_box_stdio_session(self) -> None: + if not self._uses_box_stdio(): + return + + try: + await self.ap.box_service.client.delete_session(self._build_box_session_id()) + except Exception as e: + self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {e}') + # @loader.loader_class('mcp') class MCPLoader(loader.ToolLoader): @@ -332,7 +702,7 @@ class MCPLoader(loader.ToolLoader): Args: server_config: 服务器配置字典,必须包含: - name: 服务器名称 - - mode: 连接模式 (stdio/sse) + - mode: 连接模式 (stdio/sse/http) - enable: 是否启用 - extra_args: 额外的配置参数 (可选) """ @@ -431,12 +801,13 @@ class MCPLoader(loader.ToolLoader): """获取所有服务器的信息""" info = {} for server_name, session in self.sessions.items(): + tools = session.get_tools() info[server_name] = { 'name': server_name, 'mode': session.server_config.get('mode'), 'enable': session.enable, - 'tools_count': len(session.get_tools()), - 'tool_names': [f.name for f in session.get_tools()], + 'tools_count': len(tools), + 'tool_names': [f.name for f in tools], } return info diff --git a/tests/integration_tests/box/test_box_mcp_integration.py b/tests/integration_tests/box/test_box_mcp_integration.py new file mode 100644 index 00000000..6140a3c7 --- /dev/null +++ b/tests/integration_tests/box/test_box_mcp_integration.py @@ -0,0 +1,361 @@ +"""Integration tests for Box MCP-related features. + +These tests verify managed process lifecycle, WebSocket stdio attach, +session cleanup, and the single-session query API using a real container +runtime. + +CI only runs ``tests/unit_tests/``, so these tests never execute in the +CI pipeline. Run them locally with:: + + pytest tests/integration_tests/box/test_box_mcp_integration.py -v +""" + +from __future__ import annotations + +import asyncio +import logging +import shutil +import socket +import subprocess + +import aiohttp +import pytest +from aiohttp.test_utils import TestServer + +from langbot_plugin.box.client import ActionRPCBoxClient +from langbot_plugin.box.errors import BoxSessionNotFoundError +from langbot_plugin.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec +from langbot_plugin.box.runtime import BoxRuntime +from langbot_plugin.box.server import BoxServerHandler, create_ws_relay_app + +_logger = logging.getLogger('test.box.mcp_integration') + +_TEST_IMAGE = 'alpine:latest' + + +# ── Skip helpers ────────────────────────────────────────────────────── + + +def _has_container_runtime() -> bool: + for cmd in ('podman', 'docker'): + if shutil.which(cmd) is None: + continue + try: + result = subprocess.run([cmd, 'info'], capture_output=True, timeout=10) + if result.returncode == 0: + return True + except Exception: + continue + return False + + +def _can_open_test_socket() -> bool: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except OSError: + return False + sock.close() + return True + + +requires_container = pytest.mark.skipif( + not _has_container_runtime(), + reason='no container runtime (podman/docker) available', +) + +requires_socket = pytest.mark.skipif( + not _can_open_test_socket(), + reason='local test environment does not permit opening TCP sockets', +) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +class _QueueConnection: + """In-process Connection backed by asyncio Queues — no real IO.""" + + def __init__(self, rx: asyncio.Queue[str], tx: asyncio.Queue[str]): + self._rx = rx + self._tx = tx + + async def send(self, message: str) -> None: + await self._tx.put(message) + + async def receive(self) -> str: + return await self._rx.get() + + async def close(self) -> None: + pass + + +async def _make_rpc_pair(runtime: BoxRuntime): + """Create an in-process RPC pair connected via queues.""" + from langbot_plugin.runtime.io.handler import Handler + + c2s: asyncio.Queue[str] = asyncio.Queue() + s2c: asyncio.Queue[str] = asyncio.Queue() + client_conn = _QueueConnection(rx=s2c, tx=c2s) + server_conn = _QueueConnection(rx=c2s, tx=s2c) + + server_handler = BoxServerHandler(server_conn, runtime) + server_task = asyncio.create_task(server_handler.run()) + + client_handler = Handler.__new__(Handler) + Handler.__init__(client_handler, client_conn) + client_task = asyncio.create_task(client_handler.run()) + + client = ActionRPCBoxClient(logger=_logger) + client.set_handler(client_handler) + + return client, server_task, client_task + + +# ── Fixtures ────────────────────────────────────────────────────────── + + +@pytest.fixture +async def box_server(): + """Yield a (ws_relay_url, ActionRPCBoxClient) backed by a real BoxRuntime.""" + runtime = BoxRuntime(logger=_logger) + await runtime.initialize() + + # Start ws relay for managed process attach + ws_app = create_ws_relay_app(runtime) + ws_server = TestServer(ws_app) + await ws_server.start_server() + + client, server_task, client_task = await _make_rpc_pair(runtime) + + ws_relay_url = str(ws_server.make_url('')) + yield ws_relay_url, client + + server_task.cancel() + client_task.cancel() + await runtime.shutdown() + await ws_server.close() + + +# ── 1. Managed process lifecycle ───────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_managed_process_start_and_query(box_server): + """Start a managed process and query its status.""" + ws_relay_url, client = box_server + + # Create session + spec = BoxSpec( + cmd='', + session_id='mcp-int-lifecycle', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a managed process that stays alive + proc_spec = BoxManagedProcessSpec( + command='sh', + args=['-c', 'while true; do sleep 1; done'], + cwd='/tmp', + ) + info = await client.start_managed_process('mcp-int-lifecycle', proc_spec) + assert info.status == BoxManagedProcessStatus.RUNNING + + # Query it + info2 = await client.get_managed_process('mcp-int-lifecycle') + assert info2.status == BoxManagedProcessStatus.RUNNING + assert info2.command == 'sh' + + # Cleanup + await client.delete_session('mcp-int-lifecycle') + + +# ── 2. WebSocket stdio attach ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_ws_stdio_attach_echo(box_server): + """Attach to a managed process via WebSocket and verify bidirectional IO.""" + ws_relay_url, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-ws', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a cat process (echoes stdin to stdout) + proc_spec = BoxManagedProcessSpec( + command='cat', + args=[], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-ws', proc_spec) + + # Connect via WebSocket (ws relay) + ws_url = client.get_managed_process_websocket_url('mcp-int-ws', ws_relay_url) + session = aiohttp.ClientSession() + try: + async with session.ws_connect(ws_url) as ws: + # Send a line + await ws.send_str('hello from test') + + # Expect to receive it back (cat echoes) + msg = await asyncio.wait_for(ws.receive(), timeout=5) + assert msg.type == aiohttp.WSMsgType.TEXT + assert 'hello from test' in msg.data + finally: + await session.close() + + await client.delete_session('mcp-int-ws') + + +# ── 3. Session cleanup removes container ───────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_delete_session_cleans_up(box_server): + """After deleting a session, it should no longer exist.""" + ws_relay_url, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-cleanup', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a process + proc_spec = BoxManagedProcessSpec( + command='sleep', + args=['3600'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-cleanup', proc_spec) + + # Delete + await client.delete_session('mcp-int-cleanup') + + # Session should be gone + with pytest.raises(BoxSessionNotFoundError): + await client.get_session('mcp-int-cleanup') + + +# ── 4. GET session details ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_get_session_returns_details(box_server): + """Get single session returns session details and managed process info.""" + ws_relay_url, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-get', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Query without managed process + info = await client.get_session('mcp-int-get') + assert info['session_id'] == 'mcp-int-get' + assert info['image'] == _TEST_IMAGE + assert 'managed_process' not in info + + # Start a process and query again + proc_spec = BoxManagedProcessSpec( + command='sleep', + args=['3600'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-get', proc_spec) + + info2 = await client.get_session('mcp-int-get') + assert info2['session_id'] == 'mcp-int-get' + assert 'managed_process' in info2 + assert info2['managed_process']['status'] == BoxManagedProcessStatus.RUNNING.value + + await client.delete_session('mcp-int-get') + + +# ── 5. Process exit detected ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_process_exit_detected(box_server): + """When a managed process exits, its status should reflect EXITED.""" + ws_relay_url, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-exit', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a process that exits immediately + proc_spec = BoxManagedProcessSpec( + command='sh', + args=['-c', 'echo done && exit 0'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-exit', proc_spec) + + # Wait a bit for process to exit + await asyncio.sleep(2) + + info = await client.get_managed_process('mcp-int-exit') + assert info.status == BoxManagedProcessStatus.EXITED + assert info.exit_code == 0 + + await client.delete_session('mcp-int-exit') + + +# ── 6. Instance ID orphan cleanup ─────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_orphan_cleanup_preserves_own_containers(box_server): + """Orphan cleanup should not remove containers belonging to the current instance.""" + ws_relay_url, client = box_server + + # Create a session (container gets current instance ID label) + spec = BoxSpec( + cmd='', + session_id='mcp-int-orphan', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Verify session exists + sessions = await client.get_sessions() + assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) + + # Trigger status check (which doesn't clean up own containers) + status = await client.get_status() + assert status['active_sessions'] >= 1 + + # Our session should still exist + sessions = await client.get_sessions() + assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) + + await client.delete_session('mcp-int-orphan') diff --git a/tests/unit_tests/provider/test_mcp_box_integration.py b/tests/unit_tests/provider/test_mcp_box_integration.py new file mode 100644 index 00000000..f33de781 --- /dev/null +++ b/tests/unit_tests/provider/test_mcp_box_integration.py @@ -0,0 +1,635 @@ +"""Tests for MCP Box integration: path rewriting, host_path inference, config model, payloads. + +Uses importlib.util.spec_from_file_location to load mcp.py directly without +triggering the circular import chain through the app module. +""" + +from __future__ import annotations + +import importlib +import importlib.util +import os +import sys +import tempfile +import types +from contextlib import asynccontextmanager +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + + +# --------------------------------------------------------------------------- +# Load mcp.py directly from file path, with stub dependencies +# --------------------------------------------------------------------------- + + +def _stub_module(fqn: str, attrs: dict | None = None, is_package: bool = False): + """Create or return a stub module and register it in sys.modules.""" + if fqn in sys.modules: + mod = sys.modules[fqn] + else: + mod = types.ModuleType(fqn) + mod.__spec__ = importlib.machinery.ModuleSpec(fqn, None, is_package=is_package) + if is_package: + mod.__path__ = [] + sys.modules[fqn] = mod + parts = fqn.rsplit('.', 1) + if len(parts) == 2 and parts[0] in sys.modules: + setattr(sys.modules[parts[0]], parts[1], mod) + if attrs: + for k, v in attrs.items(): + setattr(mod, k, v) + return mod + + +@pytest.fixture(scope='module', autouse=True) +def mcp_module(): + """Load mcp.py with minimal stubs to avoid circular imports.""" + saved = {} + + def _save_and_stub(name, attrs=None, is_package=False): + saved[name] = sys.modules.get(name) + # Don't overwrite modules that already exist (from other test modules) + if name in sys.modules: + return + _stub_module(name, attrs, is_package) + + # Stub entire dependency chains as packages / modules + _save_and_stub('langbot_plugin', is_package=True) + _save_and_stub('langbot_plugin.api', is_package=True) + _save_and_stub('langbot_plugin.api.entities', is_package=True) + _save_and_stub('langbot_plugin.api.entities.events', is_package=True) + _save_and_stub('langbot_plugin.api.entities.events.pipeline_query', {}) + _save_and_stub('langbot_plugin.api.entities.builtin', is_package=True) + _save_and_stub('langbot_plugin.api.entities.builtin.resource', is_package=True) + _save_and_stub( + 'langbot_plugin.api.entities.builtin.resource.tool', + { + 'LLMTool': type('LLMTool', (), {}), + }, + ) + _save_and_stub('langbot_plugin.api.entities.builtin.provider', is_package=True) + _save_and_stub('langbot_plugin.api.entities.builtin.provider.message', {}) + _save_and_stub('sqlalchemy', {'select': Mock()}) + _save_and_stub('httpx', {'AsyncClient': Mock()}) + _save_and_stub('mcp', {'ClientSession': Mock, 'StdioServerParameters': Mock}, is_package=True) + _save_and_stub('mcp.client', is_package=True) + _save_and_stub('mcp.client.stdio', {'stdio_client': Mock()}) + _save_and_stub('mcp.client.sse', {'sse_client': Mock()}) + _save_and_stub('mcp.client.streamable_http', {'streamable_http_client': Mock()}) + _save_and_stub('mcp.client.websocket', {'websocket_client': Mock()}) + + # Stub the provider.tools.loader (source of circular import) + _save_and_stub('langbot', is_package=True) + _save_and_stub('langbot.pkg', is_package=True) + _save_and_stub('langbot.pkg.provider', is_package=True) + _save_and_stub('langbot.pkg.provider.tools', is_package=True) + _save_and_stub( + 'langbot.pkg.provider.tools.loader', + { + 'ToolLoader': type('ToolLoader', (), {'__init__': lambda self, ap: None}), + }, + ) + _save_and_stub('langbot.pkg.provider.tools.loaders', is_package=True) + _save_and_stub('langbot.pkg.core', is_package=True) + _save_and_stub('langbot.pkg.core.app', {'Application': type('Application', (), {})}) + _save_and_stub('langbot.pkg.entity', is_package=True) + _save_and_stub('langbot.pkg.entity.persistence', is_package=True) + _save_and_stub('langbot.pkg.entity.persistence.mcp', {}) + + # box models + import enum as _enum + + class _BPS(str, _enum.Enum): + RUNNING = 'running' + EXITED = 'exited' + + _save_and_stub('langbot_plugin.box', is_package=True) + _save_and_stub('langbot_plugin.box.models', {'BoxManagedProcessStatus': _BPS}) + + # Now load mcp.py via spec_from_file_location + mod_fqn = 'langbot.pkg.provider.tools.loaders.mcp' + sys.modules.pop(mod_fqn, None) + mcp_path = os.path.join( + os.path.dirname(__file__), + '..', + '..', + '..', + 'src', + 'langbot', + 'pkg', + 'provider', + 'tools', + 'loaders', + 'mcp.py', + ) + mcp_path = os.path.normpath(mcp_path) + spec = importlib.util.spec_from_file_location(mod_fqn, mcp_path) + mod = importlib.util.module_from_spec(spec) + sys.modules[mod_fqn] = mod + spec.loader.exec_module(mod) + + yield mod + + # Cleanup + sys.modules.pop(mod_fqn, None) + for name in reversed(list(saved)): + if saved[name] is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = saved[name] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_ap(): + ap = Mock() + ap.logger = Mock() + ap.box_service = Mock() + return ap + + +def _make_session(mcp_module, server_config: dict, ap=None): + if ap is None: + ap = _make_ap() + return mcp_module.RuntimeMCPSession( + server_name=server_config.get('name', 'test-server'), + server_config=server_config, + enable=True, + ap=ap, + ) + + +# ── MCPServerBoxConfig ────────────────────────────────────────────── + + +class TestMCPServerBoxConfig: + def test_default_values(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate({}) + assert cfg.image is None + assert cfg.network == 'on' + assert cfg.host_path is None + assert cfg.host_path_mode == 'ro' + assert cfg.env == {} + assert cfg.startup_timeout_sec == 120 + assert cfg.cpus is None + assert cfg.memory_mb is None + assert cfg.pids_limit is None + assert cfg.read_only_rootfs is None + + def test_custom_values(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate( + { + 'image': 'node:20', + 'network': 'on', + 'host_path': '/home/user/mcp', + 'host_path_mode': 'rw', + 'env': {'FOO': 'bar'}, + 'startup_timeout_sec': 60, + 'cpus': 2.0, + 'memory_mb': 1024, + 'pids_limit': 256, + 'read_only_rootfs': False, + } + ) + assert cfg.image == 'node:20' + assert cfg.network == 'on' + assert cfg.cpus == 2.0 + assert cfg.memory_mb == 1024 + + def test_extra_fields_ignored(self, mcp_module): + cfg = mcp_module.MCPServerBoxConfig.model_validate( + { + 'image': 'node:20', + 'unknown_field': 'whatever', + } + ) + assert cfg.image == 'node:20' + assert not hasattr(cfg, 'unknown_field') + + +# ── Path Rewriting ────────────────────────────────────────────────── + + +class TestRewritePath: + def test_no_host_path_returns_unchanged(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + assert s._rewrite_path('/some/path', None) == '/some/path' + + def test_empty_path_returns_empty(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + assert s._rewrite_path('', '/home/user/mcp') == '' + + def test_prefix_match_rewrites(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + result = s._rewrite_path('/home/user/mcp/server.py', '/home/user/mcp') + assert result == '/workspace/server.py' + + def test_exact_match_rewrites_to_workspace(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + result = s._rewrite_path('/home/user/mcp', '/home/user/mcp') + assert result == '/workspace' + + def test_non_matching_path_unchanged(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + result = s._rewrite_path('/opt/other/server.py', '/home/user/mcp') + assert result == '/opt/other/server.py' + + def test_similar_prefix_not_rewritten(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + result = s._rewrite_path('/home/user/mcp-other/file.py', '/home/user/mcp') + assert result == '/home/user/mcp-other/file.py' + + def test_nested_subpath_rewrites(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + result = s._rewrite_path('/home/user/mcp/src/lib/main.py', '/home/user/mcp') + assert result == '/workspace/src/lib/main.py' + + +# ── host_path Inference ───────────────────────────────────────────── + + +class TestInferHostPath: + def test_no_absolute_paths_returns_none(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': ['server.py'], + }, + ) + assert s._infer_host_path() is None + + def test_nonexistent_path_returns_none(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': '/nonexistent/path/to/python', + 'args': [], + }, + ) + assert s._infer_host_path() is None + + def test_existing_absolute_path_infers_directory(self, mcp_module): + with tempfile.NamedTemporaryFile(suffix='.py') as f: + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [f.name], + }, + ) + result = s._infer_host_path() + assert result is not None + assert result == os.path.dirname(os.path.realpath(f.name)) + + +# ── Build Box Session Payload ─────────────────────────────────────── + + +class TestBuildBoxSessionPayload: + def test_minimal_config(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + payload = s._build_box_session_payload('session-123') + assert payload['session_id'] == 'session-123' + assert payload['workdir'] == '/workspace' + assert payload['env'] == {} + assert 'host_path' not in payload + + def test_with_host_path(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + 'box': {'host_path': '/home/user/mcp', 'host_path_mode': 'ro'}, + }, + ) + payload = s._build_box_session_payload('session-123') + assert payload['host_path'] == '/home/user/mcp' + assert payload['host_path_mode'] == 'ro' + + def test_optional_fields_included_when_set(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + 'box': {'image': 'node:20', 'cpus': 2.0, 'memory_mb': 1024, 'pids_limit': 256}, + }, + ) + payload = s._build_box_session_payload('session-123') + assert payload['image'] == 'node:20' + assert payload['cpus'] == 2.0 + assert payload['memory_mb'] == 1024 + assert payload['pids_limit'] == 256 + + def test_none_fields_excluded(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + payload = s._build_box_session_payload('session-123') + assert 'image' not in payload + assert 'cpus' not in payload + + +# ── Build Box Process Payload ─────────────────────────────────────── + + +class TestBuildBoxProcessPayload: + def test_basic_payload(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': ['server.py'], + 'env': {'KEY': 'val'}, + }, + ) + payload = s._build_box_process_payload() + assert payload['command'] == 'python' + assert payload['args'] == ['server.py'] + assert payload['env'] == {'KEY': 'val'} + assert payload['cwd'] == '/workspace' + + def test_path_rewriting_applied(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': '/home/user/mcp/venv/bin/python', + 'args': ['/home/user/mcp/server.py', '--config', '/home/user/mcp/config.json'], + 'env': {}, + 'box': {'host_path': '/home/user/mcp'}, + }, + ) + payload = s._build_box_process_payload() + # venv python is replaced with plain 'python' (deps installed in-container) + assert payload['command'] == 'python' + assert payload['args'] == ['/workspace/server.py', '--config', '/workspace/config.json'] + + def test_non_matching_args_not_rewritten(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': ['/opt/other/server.py', '--flag'], + 'env': {}, + 'box': {'host_path': '/home/user/mcp'}, + }, + ) + payload = s._build_box_process_payload() + assert payload['command'] == 'python' + assert payload['args'] == ['/opt/other/server.py', '--flag'] + + +# ── get_runtime_info_dict ─────────────────────────────────────────── + + +class TestGetRuntimeInfoDict: + def test_non_stdio_session(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'test-uuid', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + info = s.get_runtime_info_dict() + assert info['status'] == 'connecting' + assert 'box_session_id' not in info + + def test_stdio_session_includes_box_info(self, mcp_module): + ap = _make_ap() + ap.box_service.available = True + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'test-uuid', + 'mode': 'stdio', + 'command': 'python', + 'args': [], + }, + ap=ap, + ) + info = s.get_runtime_info_dict() + assert info['box_session_id'] == 'mcp-test-uuid' + assert info['box_enabled'] is True + + def test_stdio_session_without_box_runtime(self, mcp_module): + ap = _make_ap() + ap.box_service.available = False + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'test-uuid', + 'mode': 'stdio', + 'command': 'python', + 'args': [], + }, + ap=ap, + ) + info = s.get_runtime_info_dict() + assert 'box_session_id' not in info + + +# ── Box config parsing ────────────────────────────────────────────── + + +class TestBoxConfigParsing: + def test_box_config_parsed_from_server_config(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + 'box': {'image': 'node:20', 'host_path': '/home/user/mcp'}, + }, + ) + assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) + assert s.box_config.image == 'node:20' + assert s.box_config.host_path == '/home/user/mcp' + + def test_missing_box_key_uses_defaults(self, mcp_module): + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'sse', + 'command': 'python', + 'args': [], + }, + ) + assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) + assert s.box_config.image is None + assert s.box_config.host_path_mode == 'ro' + + +@pytest.mark.asyncio +async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_module): + class FakeClientSession: + def __init__(self, *_args): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def initialize(self): + return None + + @asynccontextmanager + async def fake_websocket_client(_url: str): + yield ('read-stream', 'write-stream') + + mcp_module.ClientSession = FakeClientSession + mcp_module.websocket_client = fake_websocket_client + + ap = _make_ap() + ap.box_service.available = True + ap.box_service.create_session = AsyncMock(return_value={}) + ap.box_service.build_spec = Mock(return_value='validated-spec') + ap.box_service.client = SimpleNamespace( + execute=AsyncMock(return_value=SimpleNamespace(ok=True, stderr='', exit_code=0)) + ) + ap.box_service.start_managed_process = AsyncMock(return_value={}) + ap.box_service.get_managed_process_websocket_url = Mock(return_value='ws://box.example/process') + + session = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'u1', + 'mode': 'stdio', + 'command': '/home/user/mcp/.venv/bin/python', + 'args': ['/home/user/mcp/server.py'], + 'box': {'host_path': '/home/user/mcp'}, + }, + ap=ap, + ) + session._detect_install_command = Mock(return_value='pip install --no-cache-dir -r /workspace/requirements.txt') + + await session._init_box_stdio_server() + await session.exit_stack.aclose() + + assert ap.box_service.create_session.await_count == 1 + assert ap.box_service.create_session.await_args.kwargs.get('skip_host_mount_validation', False) is False + assert ap.box_service.build_spec.call_count == 1 + assert ap.box_service.build_spec.call_args.kwargs.get('skip_host_mount_validation', False) is False