diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 8a3bbdc4..46d63b84 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -1,7 +1,6 @@ from __future__ import annotations import enum -import os import typing from contextlib import AsyncExitStack import traceback @@ -10,13 +9,11 @@ import sqlalchemy import asyncio import httpx -import pydantic import uuid as uuid_module from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from mcp.client.sse import sse_client from mcp.client.streamable_http import streamable_http_client -from mcp.client.websocket import websocket_client from .. import loader from ....core import app @@ -31,39 +28,6 @@ class MCPSessionStatus(enum.Enum): ERROR = 'error' -class MCPSessionErrorPhase(enum.Enum): - """Which phase of the MCP lifecycle failed.""" - - SESSION_CREATE = 'session_create' - DEP_INSTALL = 'dep_install' - PROCESS_START = 'process_start' - RELAY_CONNECT = 'relay_connect' - MCP_INIT = 'mcp_init' - RUNTIME = 'runtime' - TOOL_CALL = 'tool_call' - - -_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'}) -_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'}) - - -class MCPServerBoxConfig(pydantic.BaseModel): - """Structured configuration for running an MCP server inside a Box container.""" - - image: str | None = None - network: str = 'on' # MCP servers need network for dependency installation - host_path: str | None = None - host_path_mode: str = 'ro' # MCP servers default to read-only mount - env: dict[str, str] = pydantic.Field(default_factory=dict) - startup_timeout_sec: int = 120 # Longer default to allow pip install - cpus: float | None = None - memory_mb: int | None = None - pids_limit: int | None = None - read_only_rootfs: bool | None = None - - model_config = pydantic.ConfigDict(extra='ignore') - - class RuntimeMCPSession: """运行时 MCP 会话""" @@ -94,10 +58,6 @@ class RuntimeMCPSession: error_message: str | None = None - error_phase: MCPSessionErrorPhase | None = None - - retry_count: int = 0 - def __init__(self, server_name: str, server_config: dict, enable: bool, ap: app.Application): self.server_name = server_name self.server_uuid = server_config.get('uuid', '') @@ -115,14 +75,7 @@ class RuntimeMCPSession: self._shutdown_event = asyncio.Event() self._ready_event = asyncio.Event() - # Parse box config once - self.box_config = MCPServerBoxConfig.model_validate(server_config.get('box', {})) - async def _init_stdio_python_server(self): - if self._uses_box_stdio(): - await self._init_box_stdio_server() - return - server_params = StdioServerParameters( command=self.server_config['command'], args=self.server_config['args'], @@ -137,68 +90,6 @@ class RuntimeMCPSession: await self.session.initialize() - async def _init_box_stdio_server(self): - box_service = self.ap.box_service - session_id = self._build_box_session_id() - host_path = self._resolve_host_path() - session_payload = self._build_box_session_payload(session_id, host_path) - - # Phase: session creation - try: - await box_service.create_session( - session_payload, - ) - except Exception: - self.error_phase = MCPSessionErrorPhase.SESSION_CREATE - raise - - # Phase: dependency installation - if host_path: - install_cmd = self._detect_install_command(host_path) - if install_cmd: - self.ap.logger.info( - f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}' - ) - exec_payload = dict(session_payload) - exec_payload['cmd'] = install_cmd - exec_payload['timeout_sec'] = self.box_config.startup_timeout_sec or 120 - try: - result = await box_service.client.execute(box_service.build_spec(exec_payload)) - except Exception: - self.error_phase = MCPSessionErrorPhase.DEP_INSTALL - raise - if not result.ok: - self.error_phase = MCPSessionErrorPhase.DEP_INSTALL - stderr_preview = (result.stderr or '')[:500] - raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}') - - # Phase: managed process start - try: - await box_service.start_managed_process( - session_id, - self._build_box_process_payload(host_path), - ) - except Exception: - self.error_phase = MCPSessionErrorPhase.PROCESS_START - raise - - # Phase: WebSocket relay connection - try: - websocket_url = box_service.get_managed_process_websocket_url(session_id) - transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) - read_stream, write_stream = transport - self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) - except Exception: - self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT - raise - - # Phase: MCP protocol initialization - try: - await self.session.initialize() - except Exception: - self.error_phase = MCPSessionErrorPhase.MCP_INIT - raise - async def _init_sse_server(self): sse_transport = await self.exit_stack.enter_async_context( sse_client( @@ -233,11 +124,8 @@ class RuntimeMCPSession: await self.session.initialize() - _MAX_RETRIES = 3 - _RETRY_DELAYS = [2, 4, 8] - async def _lifecycle_loop(self): - """Manage the full MCP session lifecycle in a background task.""" + """在后台任务中管理整个MCP会话的生命周期""" try: if self.server_config['mode'] == 'stdio': await self._init_stdio_python_server() @@ -246,125 +134,49 @@ class RuntimeMCPSession: elif self.server_config['mode'] == 'http': await self._init_streamable_http_server() else: - raise ValueError(f'Unknown MCP server mode: {self.server_name}: {self.server_config}') + raise ValueError(f'无法识别 MCP 服务器类型: {self.server_name}: {self.server_config}') await self.refresh() self.status = MCPSessionStatus.CONNECTED - # Notify start() that connection is established + # 通知start()方法连接已建立 self._ready_event.set() - # Wait for shutdown signal, with optional health monitoring for Box stdio - if self._uses_box_stdio(): - monitor_task = asyncio.create_task(self._monitor_box_process_health()) - shutdown_task = asyncio.create_task(self._shutdown_event.wait()) - done, pending = await asyncio.wait( - [shutdown_task, monitor_task], - return_when=asyncio.FIRST_COMPLETED, - ) - for task in pending: - task.cancel() - for task in done: - if task is monitor_task and not self._shutdown_event.is_set(): - self.error_phase = MCPSessionErrorPhase.RUNTIME - raise Exception('Box managed process exited unexpectedly') - else: - await self._shutdown_event.wait() + # 等待shutdown信号 + await self._shutdown_event.wait() except Exception as e: self.status = MCPSessionStatus.ERROR self.error_message = str(e) self.ap.logger.error(f'Error in MCP session lifecycle {self.server_name}: {e}\n{traceback.format_exc()}') - # Do NOT set _ready_event here — let _lifecycle_loop_with_retry - # handle retries first. It will set the event when all retries - # are exhausted or on success. - raise # Re-raise so _lifecycle_loop_with_retry can catch it + # 即使出错也要设置ready事件,让start()方法知道初始化已完成 + self._ready_event.set() finally: - # Clean up all resources in the same task + # 在同一个任务中清理所有资源 try: if self.exit_stack: await self.exit_stack.aclose() - self.exit_stack = AsyncExitStack() self.functions.clear() self.session = None except Exception as e: self.ap.logger.error(f'Error cleaning up MCP session {self.server_name}: {e}\n{traceback.format_exc()}') - finally: - await self._cleanup_box_stdio_session() - - async def _lifecycle_loop_with_retry(self): - """Wrap _lifecycle_loop with retry and exponential backoff.""" - for attempt in range(self._MAX_RETRIES + 1): - try: - await self._lifecycle_loop() - return # Normal shutdown, don't retry - except Exception as e: - self.retry_count = attempt + 1 - if self._shutdown_event.is_set(): - return # Shutdown requested, don't retry - if attempt >= self._MAX_RETRIES: - self.status = MCPSessionStatus.ERROR - self.error_message = f'Failed after {self._MAX_RETRIES + 1} attempts: {e}' - self._ready_event.set() - return - delay = self._RETRY_DELAYS[attempt] - self.ap.logger.warning( - f'MCP session {self.server_name} failed (attempt {attempt + 1}), retrying in {delay}s: {e}' - ) - await self._cleanup_box_stdio_session() - # Reset status for retry - self.status = MCPSessionStatus.CONNECTING - self.error_message = None - self.error_phase = None - await asyncio.sleep(delay) - - _MONITOR_POLL_INTERVAL = 5 - _MONITOR_MAX_CONSECUTIVE_ERRORS = 3 - - async def _monitor_box_process_health(self): - """Poll managed process status; return when process exits.""" - from langbot_plugin.box.models import BoxManagedProcessStatus - - session_id = self._build_box_session_id() - consecutive_errors = 0 - while not self._shutdown_event.is_set(): - try: - info = await self.ap.box_service.client.get_managed_process(session_id) - if isinstance(info, dict): - status = info.get('status', '') - else: - status = getattr(info, 'status', '') - if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED: - return - consecutive_errors = 0 - except Exception as exc: - consecutive_errors += 1 - self.ap.logger.warning( - f'MCP monitor for {self.server_name}: get_managed_process failed ' - f'({consecutive_errors}/{self._MONITOR_MAX_CONSECUTIVE_ERRORS}): ' - f'{type(exc).__name__}: {exc}' - ) - if consecutive_errors >= self._MONITOR_MAX_CONSECUTIVE_ERRORS: - return - await asyncio.sleep(self._MONITOR_POLL_INTERVAL) async def start(self): if not self.enable: return - # Create background task for lifecycle management with retry - self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry()) + # 创建后台任务来管理生命周期 + self._lifecycle_task = asyncio.create_task(self._lifecycle_loop()) - # Wait for connection or failure (with timeout) - startup_timeout = self.box_config.startup_timeout_sec if self._uses_box_stdio() else 30.0 + # 等待连接建立或失败(带超时) try: - await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout) + await asyncio.wait_for(self._ready_event.wait(), timeout=30.0) except asyncio.TimeoutError: self.status = MCPSessionStatus.ERROR - raise Exception(f'Connection timeout after {startup_timeout} seconds') + raise Exception('Connection timeout after 30 seconds') - # Check for errors + # 检查是否有错误 if self.status == MCPSessionStatus.ERROR: raise Exception('Connection failed, please check URL') @@ -420,11 +232,9 @@ class RuntimeMCPSession: return self.functions def get_runtime_info_dict(self) -> dict: - info = { + return { 'status': self.status.value, 'error_message': self.error_message, - 'error_phase': self.error_phase.value if self.error_phase else None, - 'retry_count': self.retry_count, 'tool_count': len(self.get_tools()), 'tools': [ { @@ -434,10 +244,6 @@ class RuntimeMCPSession: for tool in self.get_tools() ], } - if self._uses_box_stdio(): - info['box_session_id'] = self._build_box_session_id() - info['box_enabled'] = True - return info async def shutdown(self): """关闭会话并清理资源""" @@ -461,182 +267,6 @@ class RuntimeMCPSession: except Exception as e: self.ap.logger.error(f'Error shutting down MCP session {self.server_name}: {e}\n{traceback.format_exc()}') - def _uses_box_stdio(self) -> bool: - """Check whether this stdio MCP server should run inside a Box container. - - Returns True when mode is stdio AND the Box runtime is available. - An explicit ``box`` key in server_config is NOT required — if the - runtime is reachable, stdio servers default to Box isolation. - """ - if self.server_config.get('mode') != 'stdio': - return False - try: - return getattr(self.ap.box_service, 'available', False) - except Exception: - return False - - def _build_box_session_id(self) -> str: - return f'mcp-{self.server_uuid}' - - def _rewrite_path(self, path: str, host_path: str | None) -> str: - """Rewrite host path prefix to container /workspace prefix.""" - if not host_path or not path: - return path - normalized_host = os.path.realpath(host_path) - if path.startswith(normalized_host + '/'): - return '/workspace' + path[len(normalized_host) :] - if path == normalized_host: - return '/workspace' - return path - - def _infer_host_path(self) -> str | None: - """Try to infer host_path from command and args absolute paths. - - Detects virtualenv patterns (e.g. .venv/bin/python) and walks up - to the project root rather than using the bin directory. - """ - candidates = [] - parts = [self.server_config.get('command', '')] + self.server_config.get('args', []) - for part in parts: - if not os.path.isabs(part): - continue - # Use the raw path for venv detection (before resolving symlinks) - # because .venv/bin/python is often a symlink to the system python. - if os.path.exists(part): - directory = os.path.dirname(part) - directory = self._unwrap_venv_path(directory) - candidates.append(os.path.realpath(directory)) - if not candidates: - return None - common = os.path.commonpath(candidates) - return common if common != '/' else None - - @staticmethod - def _unwrap_venv_path(directory: str) -> str: - """If directory looks like a virtualenv bin dir, return the project root. - - Recognized patterns: - /project/.venv/bin -> /project - /project/venv/bin -> /project - /project/.venv/Scripts -> /project (Windows) - /project/env/bin -> /project - """ - parts = directory.replace('\\', '/').split('/') - # Look for patterns like .../(.venv|venv|env)/(bin|Scripts) - for i in range(len(parts) - 1, 0, -1): - if parts[i] in _VENV_BIN_DIRS and i >= 1: - venv_dir = parts[i - 1] - if venv_dir in _VENV_DIRS: - # Return everything before the venv directory - project_root = '/'.join(parts[: i - 1]) - return project_root if project_root else '/' - return directory - - def _resolve_host_path(self) -> str | None: - """Resolve the effective host_path: explicit config > inference.""" - return self.box_config.host_path or self._infer_host_path() - - @staticmethod - def _detect_install_command(host_path: str) -> str | None: - """Detect how to install dependencies from the mounted project. - - Copies the project to a writable temp directory before installing, - because /workspace may be mounted read-only and pip needs to write - build artifacts in the source tree. - """ - # Use /opt instead of /tmp — /tmp is often a small tmpfs (64 MB) - # and cannot hold the copied source tree plus pip build artifacts. - _COPY_AND_INSTALL = ( - 'mkdir -p /opt/_mcp_src' - ' && tar -C /workspace' - ' --exclude=.venv --exclude=.git --exclude=__pycache__' - ' --exclude=node_modules --exclude=.tox --exclude=.nox' - ' --exclude="*.egg-info" --exclude=.uv-cache' - ' -cf - .' - ' | tar -C /opt/_mcp_src -xf -' - ' && pip install --no-cache-dir /opt/_mcp_src' - ' && rm -rf /opt/_mcp_src' - ) - _INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt' - - if os.path.isfile(os.path.join(host_path, 'pyproject.toml')): - return _COPY_AND_INSTALL - if os.path.isfile(os.path.join(host_path, 'setup.py')): - return _COPY_AND_INSTALL - if os.path.isfile(os.path.join(host_path, 'requirements.txt')): - return _INSTALL_REQUIREMENTS - return None - - def _build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict: - bc = self.box_config - if host_path is None: - host_path = self._resolve_host_path() - - payload: dict[str, typing.Any] = { - 'session_id': session_id, - 'workdir': '/workspace', - 'env': bc.env, - # MCP sessions need network for dependency install and writable rootfs - 'network': bc.network, - 'read_only_rootfs': bc.read_only_rootfs if bc.read_only_rootfs is not None else False, - } - if host_path: - payload['host_path'] = host_path - payload['host_path_mode'] = bc.host_path_mode - for key in ('image', 'cpus', 'memory_mb', 'pids_limit'): - val = getattr(bc, key) - if val is not None: - payload[key] = val if not isinstance(val, enum.Enum) else val.value - return payload - - def _build_box_process_payload(self, host_path: str | None = None) -> dict: - if host_path is None: - host_path = self._resolve_host_path() - - command = self.server_config['command'] - args = self.server_config.get('args', []) - cwd = '/workspace' - - if host_path: - # When host_path is resolved, we install deps in-container rather - # than relying on the host venv. Rewrite paths so the container - # sees /workspace/... but replace venv python with plain "python". - command = self._rewrite_venv_command(command, host_path) - args = [self._rewrite_path(a, host_path) for a in args] - cwd = self._rewrite_path(cwd, host_path) - - return { - 'command': command, - 'args': args, - 'env': self.server_config.get('env', {}), - 'cwd': cwd, - } - - def _rewrite_venv_command(self, command: str, host_path: str) -> str: - """Rewrite command: if it points to a venv python, use plain 'python'.""" - if not host_path or not command: - return command - normalized_host = os.path.realpath(host_path) - if not command.startswith(normalized_host + '/'): - return command - # Check if command is a venv python interpreter - rel = command[len(normalized_host) + 1 :] # e.g. ".venv/bin/python" - parts = rel.replace('\\', '/').split('/') - # Match patterns like .venv/bin/python*, venv/bin/python*, etc. - if len(parts) >= 3 and parts[0] in _VENV_DIRS and parts[1] in _VENV_BIN_DIRS and parts[2].startswith('python'): - return 'python' - # Not a venv python — do normal path rewrite - return self._rewrite_path(command, host_path) - - async def _cleanup_box_stdio_session(self) -> None: - if not self._uses_box_stdio(): - return - - try: - await self.ap.box_service.client.delete_session(self._build_box_session_id()) - except Exception as e: - self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {e}') - # @loader.loader_class('mcp') class MCPLoader(loader.ToolLoader): @@ -702,7 +332,7 @@ class MCPLoader(loader.ToolLoader): Args: server_config: 服务器配置字典,必须包含: - name: 服务器名称 - - mode: 连接模式 (stdio/sse/http) + - mode: 连接模式 (stdio/sse) - enable: 是否启用 - extra_args: 额外的配置参数 (可选) """ @@ -801,13 +431,12 @@ class MCPLoader(loader.ToolLoader): """获取所有服务器的信息""" info = {} for server_name, session in self.sessions.items(): - tools = session.get_tools() info[server_name] = { 'name': server_name, 'mode': session.server_config.get('mode'), 'enable': session.enable, - 'tools_count': len(tools), - 'tool_names': [f.name for f in tools], + 'tools_count': len(session.get_tools()), + 'tool_names': [f.name for f in session.get_tools()], } return info diff --git a/tests/integration_tests/box/test_box_mcp_integration.py b/tests/integration_tests/box/test_box_mcp_integration.py deleted file mode 100644 index 6140a3c7..00000000 --- a/tests/integration_tests/box/test_box_mcp_integration.py +++ /dev/null @@ -1,361 +0,0 @@ -"""Integration tests for Box MCP-related features. - -These tests verify managed process lifecycle, WebSocket stdio attach, -session cleanup, and the single-session query API using a real container -runtime. - -CI only runs ``tests/unit_tests/``, so these tests never execute in the -CI pipeline. Run them locally with:: - - pytest tests/integration_tests/box/test_box_mcp_integration.py -v -""" - -from __future__ import annotations - -import asyncio -import logging -import shutil -import socket -import subprocess - -import aiohttp -import pytest -from aiohttp.test_utils import TestServer - -from langbot_plugin.box.client import ActionRPCBoxClient -from langbot_plugin.box.errors import BoxSessionNotFoundError -from langbot_plugin.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec -from langbot_plugin.box.runtime import BoxRuntime -from langbot_plugin.box.server import BoxServerHandler, create_ws_relay_app - -_logger = logging.getLogger('test.box.mcp_integration') - -_TEST_IMAGE = 'alpine:latest' - - -# ── Skip helpers ────────────────────────────────────────────────────── - - -def _has_container_runtime() -> bool: - for cmd in ('podman', 'docker'): - if shutil.which(cmd) is None: - continue - try: - result = subprocess.run([cmd, 'info'], capture_output=True, timeout=10) - if result.returncode == 0: - return True - except Exception: - continue - return False - - -def _can_open_test_socket() -> bool: - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - except OSError: - return False - sock.close() - return True - - -requires_container = pytest.mark.skipif( - not _has_container_runtime(), - reason='no container runtime (podman/docker) available', -) - -requires_socket = pytest.mark.skipif( - not _can_open_test_socket(), - reason='local test environment does not permit opening TCP sockets', -) - - -# ── Helpers ────────────────────────────────────────────────────────── - - -class _QueueConnection: - """In-process Connection backed by asyncio Queues — no real IO.""" - - def __init__(self, rx: asyncio.Queue[str], tx: asyncio.Queue[str]): - self._rx = rx - self._tx = tx - - async def send(self, message: str) -> None: - await self._tx.put(message) - - async def receive(self) -> str: - return await self._rx.get() - - async def close(self) -> None: - pass - - -async def _make_rpc_pair(runtime: BoxRuntime): - """Create an in-process RPC pair connected via queues.""" - from langbot_plugin.runtime.io.handler import Handler - - c2s: asyncio.Queue[str] = asyncio.Queue() - s2c: asyncio.Queue[str] = asyncio.Queue() - client_conn = _QueueConnection(rx=s2c, tx=c2s) - server_conn = _QueueConnection(rx=c2s, tx=s2c) - - server_handler = BoxServerHandler(server_conn, runtime) - server_task = asyncio.create_task(server_handler.run()) - - client_handler = Handler.__new__(Handler) - Handler.__init__(client_handler, client_conn) - client_task = asyncio.create_task(client_handler.run()) - - client = ActionRPCBoxClient(logger=_logger) - client.set_handler(client_handler) - - return client, server_task, client_task - - -# ── Fixtures ────────────────────────────────────────────────────────── - - -@pytest.fixture -async def box_server(): - """Yield a (ws_relay_url, ActionRPCBoxClient) backed by a real BoxRuntime.""" - runtime = BoxRuntime(logger=_logger) - await runtime.initialize() - - # Start ws relay for managed process attach - ws_app = create_ws_relay_app(runtime) - ws_server = TestServer(ws_app) - await ws_server.start_server() - - client, server_task, client_task = await _make_rpc_pair(runtime) - - ws_relay_url = str(ws_server.make_url('')) - yield ws_relay_url, client - - server_task.cancel() - client_task.cancel() - await runtime.shutdown() - await ws_server.close() - - -# ── 1. Managed process lifecycle ───────────────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_managed_process_start_and_query(box_server): - """Start a managed process and query its status.""" - ws_relay_url, client = box_server - - # Create session - spec = BoxSpec( - cmd='', - session_id='mcp-int-lifecycle', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Start a managed process that stays alive - proc_spec = BoxManagedProcessSpec( - command='sh', - args=['-c', 'while true; do sleep 1; done'], - cwd='/tmp', - ) - info = await client.start_managed_process('mcp-int-lifecycle', proc_spec) - assert info.status == BoxManagedProcessStatus.RUNNING - - # Query it - info2 = await client.get_managed_process('mcp-int-lifecycle') - assert info2.status == BoxManagedProcessStatus.RUNNING - assert info2.command == 'sh' - - # Cleanup - await client.delete_session('mcp-int-lifecycle') - - -# ── 2. WebSocket stdio attach ──────────────────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_ws_stdio_attach_echo(box_server): - """Attach to a managed process via WebSocket and verify bidirectional IO.""" - ws_relay_url, client = box_server - - spec = BoxSpec( - cmd='', - session_id='mcp-int-ws', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Start a cat process (echoes stdin to stdout) - proc_spec = BoxManagedProcessSpec( - command='cat', - args=[], - cwd='/tmp', - ) - await client.start_managed_process('mcp-int-ws', proc_spec) - - # Connect via WebSocket (ws relay) - ws_url = client.get_managed_process_websocket_url('mcp-int-ws', ws_relay_url) - session = aiohttp.ClientSession() - try: - async with session.ws_connect(ws_url) as ws: - # Send a line - await ws.send_str('hello from test') - - # Expect to receive it back (cat echoes) - msg = await asyncio.wait_for(ws.receive(), timeout=5) - assert msg.type == aiohttp.WSMsgType.TEXT - assert 'hello from test' in msg.data - finally: - await session.close() - - await client.delete_session('mcp-int-ws') - - -# ── 3. Session cleanup removes container ───────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_delete_session_cleans_up(box_server): - """After deleting a session, it should no longer exist.""" - ws_relay_url, client = box_server - - spec = BoxSpec( - cmd='', - session_id='mcp-int-cleanup', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Start a process - proc_spec = BoxManagedProcessSpec( - command='sleep', - args=['3600'], - cwd='/tmp', - ) - await client.start_managed_process('mcp-int-cleanup', proc_spec) - - # Delete - await client.delete_session('mcp-int-cleanup') - - # Session should be gone - with pytest.raises(BoxSessionNotFoundError): - await client.get_session('mcp-int-cleanup') - - -# ── 4. GET session details ──────────────────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_get_session_returns_details(box_server): - """Get single session returns session details and managed process info.""" - ws_relay_url, client = box_server - - spec = BoxSpec( - cmd='', - session_id='mcp-int-get', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Query without managed process - info = await client.get_session('mcp-int-get') - assert info['session_id'] == 'mcp-int-get' - assert info['image'] == _TEST_IMAGE - assert 'managed_process' not in info - - # Start a process and query again - proc_spec = BoxManagedProcessSpec( - command='sleep', - args=['3600'], - cwd='/tmp', - ) - await client.start_managed_process('mcp-int-get', proc_spec) - - info2 = await client.get_session('mcp-int-get') - assert info2['session_id'] == 'mcp-int-get' - assert 'managed_process' in info2 - assert info2['managed_process']['status'] == BoxManagedProcessStatus.RUNNING.value - - await client.delete_session('mcp-int-get') - - -# ── 5. Process exit detected ──────────────────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_process_exit_detected(box_server): - """When a managed process exits, its status should reflect EXITED.""" - ws_relay_url, client = box_server - - spec = BoxSpec( - cmd='', - session_id='mcp-int-exit', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Start a process that exits immediately - proc_spec = BoxManagedProcessSpec( - command='sh', - args=['-c', 'echo done && exit 0'], - cwd='/tmp', - ) - await client.start_managed_process('mcp-int-exit', proc_spec) - - # Wait a bit for process to exit - await asyncio.sleep(2) - - info = await client.get_managed_process('mcp-int-exit') - assert info.status == BoxManagedProcessStatus.EXITED - assert info.exit_code == 0 - - await client.delete_session('mcp-int-exit') - - -# ── 6. Instance ID orphan cleanup ─────────────────────────────────── - - -@requires_container -@requires_socket -@pytest.mark.asyncio -async def test_orphan_cleanup_preserves_own_containers(box_server): - """Orphan cleanup should not remove containers belonging to the current instance.""" - ws_relay_url, client = box_server - - # Create a session (container gets current instance ID label) - spec = BoxSpec( - cmd='', - session_id='mcp-int-orphan', - workdir='/tmp', - image=_TEST_IMAGE, - ) - await client.create_session(spec) - - # Verify session exists - sessions = await client.get_sessions() - assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) - - # Trigger status check (which doesn't clean up own containers) - status = await client.get_status() - assert status['active_sessions'] >= 1 - - # Our session should still exist - sessions = await client.get_sessions() - assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) - - await client.delete_session('mcp-int-orphan') diff --git a/tests/unit_tests/provider/test_mcp_box_integration.py b/tests/unit_tests/provider/test_mcp_box_integration.py deleted file mode 100644 index f33de781..00000000 --- a/tests/unit_tests/provider/test_mcp_box_integration.py +++ /dev/null @@ -1,635 +0,0 @@ -"""Tests for MCP Box integration: path rewriting, host_path inference, config model, payloads. - -Uses importlib.util.spec_from_file_location to load mcp.py directly without -triggering the circular import chain through the app module. -""" - -from __future__ import annotations - -import importlib -import importlib.util -import os -import sys -import tempfile -import types -from contextlib import asynccontextmanager -from types import SimpleNamespace -from unittest.mock import AsyncMock, Mock - -import pytest - - -# --------------------------------------------------------------------------- -# Load mcp.py directly from file path, with stub dependencies -# --------------------------------------------------------------------------- - - -def _stub_module(fqn: str, attrs: dict | None = None, is_package: bool = False): - """Create or return a stub module and register it in sys.modules.""" - if fqn in sys.modules: - mod = sys.modules[fqn] - else: - mod = types.ModuleType(fqn) - mod.__spec__ = importlib.machinery.ModuleSpec(fqn, None, is_package=is_package) - if is_package: - mod.__path__ = [] - sys.modules[fqn] = mod - parts = fqn.rsplit('.', 1) - if len(parts) == 2 and parts[0] in sys.modules: - setattr(sys.modules[parts[0]], parts[1], mod) - if attrs: - for k, v in attrs.items(): - setattr(mod, k, v) - return mod - - -@pytest.fixture(scope='module', autouse=True) -def mcp_module(): - """Load mcp.py with minimal stubs to avoid circular imports.""" - saved = {} - - def _save_and_stub(name, attrs=None, is_package=False): - saved[name] = sys.modules.get(name) - # Don't overwrite modules that already exist (from other test modules) - if name in sys.modules: - return - _stub_module(name, attrs, is_package) - - # Stub entire dependency chains as packages / modules - _save_and_stub('langbot_plugin', is_package=True) - _save_and_stub('langbot_plugin.api', is_package=True) - _save_and_stub('langbot_plugin.api.entities', is_package=True) - _save_and_stub('langbot_plugin.api.entities.events', is_package=True) - _save_and_stub('langbot_plugin.api.entities.events.pipeline_query', {}) - _save_and_stub('langbot_plugin.api.entities.builtin', is_package=True) - _save_and_stub('langbot_plugin.api.entities.builtin.resource', is_package=True) - _save_and_stub( - 'langbot_plugin.api.entities.builtin.resource.tool', - { - 'LLMTool': type('LLMTool', (), {}), - }, - ) - _save_and_stub('langbot_plugin.api.entities.builtin.provider', is_package=True) - _save_and_stub('langbot_plugin.api.entities.builtin.provider.message', {}) - _save_and_stub('sqlalchemy', {'select': Mock()}) - _save_and_stub('httpx', {'AsyncClient': Mock()}) - _save_and_stub('mcp', {'ClientSession': Mock, 'StdioServerParameters': Mock}, is_package=True) - _save_and_stub('mcp.client', is_package=True) - _save_and_stub('mcp.client.stdio', {'stdio_client': Mock()}) - _save_and_stub('mcp.client.sse', {'sse_client': Mock()}) - _save_and_stub('mcp.client.streamable_http', {'streamable_http_client': Mock()}) - _save_and_stub('mcp.client.websocket', {'websocket_client': Mock()}) - - # Stub the provider.tools.loader (source of circular import) - _save_and_stub('langbot', is_package=True) - _save_and_stub('langbot.pkg', is_package=True) - _save_and_stub('langbot.pkg.provider', is_package=True) - _save_and_stub('langbot.pkg.provider.tools', is_package=True) - _save_and_stub( - 'langbot.pkg.provider.tools.loader', - { - 'ToolLoader': type('ToolLoader', (), {'__init__': lambda self, ap: None}), - }, - ) - _save_and_stub('langbot.pkg.provider.tools.loaders', is_package=True) - _save_and_stub('langbot.pkg.core', is_package=True) - _save_and_stub('langbot.pkg.core.app', {'Application': type('Application', (), {})}) - _save_and_stub('langbot.pkg.entity', is_package=True) - _save_and_stub('langbot.pkg.entity.persistence', is_package=True) - _save_and_stub('langbot.pkg.entity.persistence.mcp', {}) - - # box models - import enum as _enum - - class _BPS(str, _enum.Enum): - RUNNING = 'running' - EXITED = 'exited' - - _save_and_stub('langbot_plugin.box', is_package=True) - _save_and_stub('langbot_plugin.box.models', {'BoxManagedProcessStatus': _BPS}) - - # Now load mcp.py via spec_from_file_location - mod_fqn = 'langbot.pkg.provider.tools.loaders.mcp' - sys.modules.pop(mod_fqn, None) - mcp_path = os.path.join( - os.path.dirname(__file__), - '..', - '..', - '..', - 'src', - 'langbot', - 'pkg', - 'provider', - 'tools', - 'loaders', - 'mcp.py', - ) - mcp_path = os.path.normpath(mcp_path) - spec = importlib.util.spec_from_file_location(mod_fqn, mcp_path) - mod = importlib.util.module_from_spec(spec) - sys.modules[mod_fqn] = mod - spec.loader.exec_module(mod) - - yield mod - - # Cleanup - sys.modules.pop(mod_fqn, None) - for name in reversed(list(saved)): - if saved[name] is None: - sys.modules.pop(name, None) - else: - sys.modules[name] = saved[name] - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _make_ap(): - ap = Mock() - ap.logger = Mock() - ap.box_service = Mock() - return ap - - -def _make_session(mcp_module, server_config: dict, ap=None): - if ap is None: - ap = _make_ap() - return mcp_module.RuntimeMCPSession( - server_name=server_config.get('name', 'test-server'), - server_config=server_config, - enable=True, - ap=ap, - ) - - -# ── MCPServerBoxConfig ────────────────────────────────────────────── - - -class TestMCPServerBoxConfig: - def test_default_values(self, mcp_module): - cfg = mcp_module.MCPServerBoxConfig.model_validate({}) - assert cfg.image is None - assert cfg.network == 'on' - assert cfg.host_path is None - assert cfg.host_path_mode == 'ro' - assert cfg.env == {} - assert cfg.startup_timeout_sec == 120 - assert cfg.cpus is None - assert cfg.memory_mb is None - assert cfg.pids_limit is None - assert cfg.read_only_rootfs is None - - def test_custom_values(self, mcp_module): - cfg = mcp_module.MCPServerBoxConfig.model_validate( - { - 'image': 'node:20', - 'network': 'on', - 'host_path': '/home/user/mcp', - 'host_path_mode': 'rw', - 'env': {'FOO': 'bar'}, - 'startup_timeout_sec': 60, - 'cpus': 2.0, - 'memory_mb': 1024, - 'pids_limit': 256, - 'read_only_rootfs': False, - } - ) - assert cfg.image == 'node:20' - assert cfg.network == 'on' - assert cfg.cpus == 2.0 - assert cfg.memory_mb == 1024 - - def test_extra_fields_ignored(self, mcp_module): - cfg = mcp_module.MCPServerBoxConfig.model_validate( - { - 'image': 'node:20', - 'unknown_field': 'whatever', - } - ) - assert cfg.image == 'node:20' - assert not hasattr(cfg, 'unknown_field') - - -# ── Path Rewriting ────────────────────────────────────────────────── - - -class TestRewritePath: - def test_no_host_path_returns_unchanged(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - assert s._rewrite_path('/some/path', None) == '/some/path' - - def test_empty_path_returns_empty(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - assert s._rewrite_path('', '/home/user/mcp') == '' - - def test_prefix_match_rewrites(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - result = s._rewrite_path('/home/user/mcp/server.py', '/home/user/mcp') - assert result == '/workspace/server.py' - - def test_exact_match_rewrites_to_workspace(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - result = s._rewrite_path('/home/user/mcp', '/home/user/mcp') - assert result == '/workspace' - - def test_non_matching_path_unchanged(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - result = s._rewrite_path('/opt/other/server.py', '/home/user/mcp') - assert result == '/opt/other/server.py' - - def test_similar_prefix_not_rewritten(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - result = s._rewrite_path('/home/user/mcp-other/file.py', '/home/user/mcp') - assert result == '/home/user/mcp-other/file.py' - - def test_nested_subpath_rewrites(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - result = s._rewrite_path('/home/user/mcp/src/lib/main.py', '/home/user/mcp') - assert result == '/workspace/src/lib/main.py' - - -# ── host_path Inference ───────────────────────────────────────────── - - -class TestInferHostPath: - def test_no_absolute_paths_returns_none(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': ['server.py'], - }, - ) - assert s._infer_host_path() is None - - def test_nonexistent_path_returns_none(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': '/nonexistent/path/to/python', - 'args': [], - }, - ) - assert s._infer_host_path() is None - - def test_existing_absolute_path_infers_directory(self, mcp_module): - with tempfile.NamedTemporaryFile(suffix='.py') as f: - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [f.name], - }, - ) - result = s._infer_host_path() - assert result is not None - assert result == os.path.dirname(os.path.realpath(f.name)) - - -# ── Build Box Session Payload ─────────────────────────────────────── - - -class TestBuildBoxSessionPayload: - def test_minimal_config(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - payload = s._build_box_session_payload('session-123') - assert payload['session_id'] == 'session-123' - assert payload['workdir'] == '/workspace' - assert payload['env'] == {} - assert 'host_path' not in payload - - def test_with_host_path(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - 'box': {'host_path': '/home/user/mcp', 'host_path_mode': 'ro'}, - }, - ) - payload = s._build_box_session_payload('session-123') - assert payload['host_path'] == '/home/user/mcp' - assert payload['host_path_mode'] == 'ro' - - def test_optional_fields_included_when_set(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - 'box': {'image': 'node:20', 'cpus': 2.0, 'memory_mb': 1024, 'pids_limit': 256}, - }, - ) - payload = s._build_box_session_payload('session-123') - assert payload['image'] == 'node:20' - assert payload['cpus'] == 2.0 - assert payload['memory_mb'] == 1024 - assert payload['pids_limit'] == 256 - - def test_none_fields_excluded(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - payload = s._build_box_session_payload('session-123') - assert 'image' not in payload - assert 'cpus' not in payload - - -# ── Build Box Process Payload ─────────────────────────────────────── - - -class TestBuildBoxProcessPayload: - def test_basic_payload(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': ['server.py'], - 'env': {'KEY': 'val'}, - }, - ) - payload = s._build_box_process_payload() - assert payload['command'] == 'python' - assert payload['args'] == ['server.py'] - assert payload['env'] == {'KEY': 'val'} - assert payload['cwd'] == '/workspace' - - def test_path_rewriting_applied(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': '/home/user/mcp/venv/bin/python', - 'args': ['/home/user/mcp/server.py', '--config', '/home/user/mcp/config.json'], - 'env': {}, - 'box': {'host_path': '/home/user/mcp'}, - }, - ) - payload = s._build_box_process_payload() - # venv python is replaced with plain 'python' (deps installed in-container) - assert payload['command'] == 'python' - assert payload['args'] == ['/workspace/server.py', '--config', '/workspace/config.json'] - - def test_non_matching_args_not_rewritten(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': ['/opt/other/server.py', '--flag'], - 'env': {}, - 'box': {'host_path': '/home/user/mcp'}, - }, - ) - payload = s._build_box_process_payload() - assert payload['command'] == 'python' - assert payload['args'] == ['/opt/other/server.py', '--flag'] - - -# ── get_runtime_info_dict ─────────────────────────────────────────── - - -class TestGetRuntimeInfoDict: - def test_non_stdio_session(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'test-uuid', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - info = s.get_runtime_info_dict() - assert info['status'] == 'connecting' - assert 'box_session_id' not in info - - def test_stdio_session_includes_box_info(self, mcp_module): - ap = _make_ap() - ap.box_service.available = True - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'test-uuid', - 'mode': 'stdio', - 'command': 'python', - 'args': [], - }, - ap=ap, - ) - info = s.get_runtime_info_dict() - assert info['box_session_id'] == 'mcp-test-uuid' - assert info['box_enabled'] is True - - def test_stdio_session_without_box_runtime(self, mcp_module): - ap = _make_ap() - ap.box_service.available = False - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'test-uuid', - 'mode': 'stdio', - 'command': 'python', - 'args': [], - }, - ap=ap, - ) - info = s.get_runtime_info_dict() - assert 'box_session_id' not in info - - -# ── Box config parsing ────────────────────────────────────────────── - - -class TestBoxConfigParsing: - def test_box_config_parsed_from_server_config(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - 'box': {'image': 'node:20', 'host_path': '/home/user/mcp'}, - }, - ) - assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) - assert s.box_config.image == 'node:20' - assert s.box_config.host_path == '/home/user/mcp' - - def test_missing_box_key_uses_defaults(self, mcp_module): - s = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'sse', - 'command': 'python', - 'args': [], - }, - ) - assert isinstance(s.box_config, mcp_module.MCPServerBoxConfig) - assert s.box_config.image is None - assert s.box_config.host_path_mode == 'ro' - - -@pytest.mark.asyncio -async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_module): - class FakeClientSession: - def __init__(self, *_args): - pass - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc, tb): - return False - - async def initialize(self): - return None - - @asynccontextmanager - async def fake_websocket_client(_url: str): - yield ('read-stream', 'write-stream') - - mcp_module.ClientSession = FakeClientSession - mcp_module.websocket_client = fake_websocket_client - - ap = _make_ap() - ap.box_service.available = True - ap.box_service.create_session = AsyncMock(return_value={}) - ap.box_service.build_spec = Mock(return_value='validated-spec') - ap.box_service.client = SimpleNamespace( - execute=AsyncMock(return_value=SimpleNamespace(ok=True, stderr='', exit_code=0)) - ) - ap.box_service.start_managed_process = AsyncMock(return_value={}) - ap.box_service.get_managed_process_websocket_url = Mock(return_value='ws://box.example/process') - - session = _make_session( - mcp_module, - { - 'name': 'test', - 'uuid': 'u1', - 'mode': 'stdio', - 'command': '/home/user/mcp/.venv/bin/python', - 'args': ['/home/user/mcp/server.py'], - 'box': {'host_path': '/home/user/mcp'}, - }, - ap=ap, - ) - session._detect_install_command = Mock(return_value='pip install --no-cache-dir -r /workspace/requirements.txt') - - await session._init_box_stdio_server() - await session.exit_stack.aclose() - - assert ap.box_service.create_session.await_count == 1 - assert ap.box_service.create_session.await_args.kwargs.get('skip_host_mount_validation', False) is False - assert ap.box_service.build_spec.call_count == 1 - assert ap.box_service.build_spec.call_args.kwargs.get('skip_host_mount_validation', False) is False