From 791d0526871f53cbafaf595ecce32a3ef6bf7f8d Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sat, 21 Mar 2026 05:19:48 +0000 Subject: [PATCH] feat(box/mcp): instance-based orphan cleanup, error classification, session API, and integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Changes ### Precise orphan container cleanup - Runtime generates a unique instance_id on startup - Every container gets a `langbot.box.instance_id` label - `cleanup_orphaned_containers()` only removes containers from previous instances, preserving containers owned by the current one - Containers from older versions (no label) are also cleaned up - `cleanup_orphaned_containers` added to `BaseSandboxBackend` as a no-op default method, removing hasattr duck-typing ### Fine-grained MCP error classification - New `MCPSessionErrorPhase` enum with 7 phases: session_create, dep_install, process_start, relay_connect, mcp_init, runtime, tool_call - Each phase in `_init_box_stdio_server()` sets the error phase before re-raising, enabling precise failure diagnosis - `retry_count` tracked across retry attempts - `get_runtime_info_dict()` exposes `error_phase` and `retry_count` ### GET /v1/sessions/{id} API - `BoxRuntime.get_session()` returns session details including managed process info when present - `handle_get_session` HTTP handler + route in server.py - `BoxRuntimeClient.get_session()` abstract method + remote impl ### stdio defaults to Box when runtime is available - `_uses_box_stdio()` checks `box_service.available` instead of requiring explicit `box` key in server_config - `BoxService.initialize()` catches runtime errors gracefully, sets `available=False` instead of crashing LangBot startup - When no container runtime exists, stdio MCP falls back to host-direct execution ### Code quality (from /simplify review) - Extracted `_VENV_DIRS` / `_VENV_BIN_DIRS` module-level constants - Removed dead `_box_network_mode()` method and unused `bc` variable - Fixed broken import `from ....box.models` → `from ...box.models` - Cached `_resolve_host_path()` result — computed once, passed through - Config hash now includes `host_path` field - Batched orphan cleanup into single `rm -f` command ### Session leak fix - `_cleanup_box_stdio_session()` now runs in `_lifecycle_loop`'s finally block, covering all exit paths (normal shutdown, error, retry, final failure) ### Integration tests - 6 end-to-end tests covering managed process lifecycle, WebSocket stdio bidirectional IO, session cleanup verification, single session query, process exit detection, and orphan cleanup safety --- src/langbot/pkg/box/backend.py | 34 +- src/langbot/pkg/box/client.py | 12 + src/langbot/pkg/box/runtime.py | 16 +- src/langbot/pkg/box/server.py | 10 + src/langbot/pkg/provider/tools/loaders/mcp.py | 83 +++-- .../box/test_box_mcp_integration.py | 313 ++++++++++++++++++ tests/unit_tests/box/test_box_service.py | 3 + 7 files changed, 442 insertions(+), 29 deletions(-) create mode 100644 tests/integration_tests/box/test_box_mcp_integration.py diff --git a/src/langbot/pkg/box/backend.py b/src/langbot/pkg/box/backend.py index fda0846f..b8208fcc 100644 --- a/src/langbot/pkg/box/backend.py +++ b/src/langbot/pkg/box/backend.py @@ -34,6 +34,7 @@ class _CommandResult: class BaseSandboxBackend(abc.ABC): name: str + instance_id: str = '' def __init__(self, logger: logging.Logger): self.logger = logger @@ -60,7 +61,7 @@ class BaseSandboxBackend(abc.ABC): async def start_managed_process(self, session: BoxSessionInfo, spec): raise BoxError(f'{self.name} backend does not support managed processes') - async def cleanup_orphaned_containers(self): + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): """Remove lingering containers from previous runs. No-op by default.""" pass @@ -97,6 +98,8 @@ class CLISandboxBackend(BaseSandboxBackend): 'langbot.box=true', '--label', f'langbot.session_id={spec.session_id}', + '--label', + f'langbot.box.instance_id={self.instance_id}', ] # Config hash label for identifying configuration drift @@ -218,22 +221,37 @@ class CLISandboxBackend(BaseSandboxBackend): check=False, ) - async def cleanup_orphaned_containers(self): - """Remove any lingering langbot.box containers from previous runs.""" + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + """Remove langbot.box containers from previous instances. + + Only removes containers whose ``langbot.box.instance_id`` label does + NOT match *current_instance_id*. Containers without the label (from + older versions) are also removed. + """ result = await self._run_command( - [self.command, 'ps', '-a', '--filter', 'label=langbot.box=true', '-q'], + [self.command, 'ps', '-a', '--filter', 'label=langbot.box=true', + '--format', '{{.ID}}\t{{.Label "langbot.box.instance_id"}}'], timeout_sec=10, check=False, ) if result.return_code != 0 or not result.stdout.strip(): return - container_ids = [cid.strip() for cid in result.stdout.strip().split('\n') if cid.strip()] - if not container_ids: + orphan_ids = [] + for line in result.stdout.strip().split('\n'): + line = line.strip() + if not line: + continue + parts = line.split('\t', 1) + cid = parts[0].strip() + label_instance = parts[1].strip() if len(parts) > 1 else '' + if label_instance != current_instance_id: + orphan_ids.append(cid) + if not orphan_ids: return - for cid in container_ids: + for cid in orphan_ids: self.logger.info(f'Cleaning up orphaned Box container: {cid}') await self._run_command( - [self.command, 'rm', '-f', *container_ids], + [self.command, 'rm', '-f', *orphan_ids], timeout_sec=30, check=False, ) diff --git a/src/langbot/pkg/box/client.py b/src/langbot/pkg/box/client.py index cb83bf84..03c0839f 100644 --- a/src/langbot/pkg/box/client.py +++ b/src/langbot/pkg/box/client.py @@ -86,6 +86,9 @@ class BoxRuntimeClient(abc.ABC): @abc.abstractmethod async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ... + @abc.abstractmethod + async def get_session(self, session_id: str) -> dict: ... + class RemoteBoxRuntimeClient(BoxRuntimeClient): """HTTP client that talks to a standalone Box Runtime service.""" @@ -168,6 +171,15 @@ class RemoteBoxRuntimeClient(BoxRuntimeClient): except aiohttp.ClientError as exc: raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + async def get_session(self, session_id: str) -> dict: + session = self._get_session() + try: + async with session.get(f'{self._base_url}/v1/sessions/{session_id}') as resp: + await self._check_response(resp) + return await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + async def get_backend_info(self) -> dict: session = self._get_session() try: diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py index 93078b71..4346f7a1 100644 --- a/src/langbot/pkg/box/runtime.py +++ b/src/langbot/pkg/box/runtime.py @@ -5,6 +5,7 @@ import collections import dataclasses import datetime as dt import logging +import uuid from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend from .errors import ( @@ -64,12 +65,14 @@ class BoxRuntime: self._backend: BaseSandboxBackend | None = None self._sessions: dict[str, _RuntimeSession] = {} self._lock = asyncio.Lock() + self.instance_id = uuid.uuid4().hex[:12] async def initialize(self): self._backend = await self._select_backend() if self._backend is not None: + self._backend.instance_id = self.instance_id try: - await self._backend.cleanup_orphaned_containers() + await self._backend.cleanup_orphaned_containers(self.instance_id) except Exception as exc: self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}') @@ -164,6 +167,17 @@ class BoxRuntime: def get_sessions(self) -> list[dict]: return [self._session_to_dict(s.info) for s in self._sessions.values()] + def get_session(self, session_id: str) -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + result = self._session_to_dict(runtime_session.info) + if runtime_session.managed_process is not None: + result['managed_process'] = self._managed_process_to_dict( + session_id, runtime_session.managed_process + ) + return result + async def get_status(self) -> dict: backend_info = await self.get_backend_info() return { diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py index 0b764787..070417c9 100644 --- a/src/langbot/pkg/box/server.py +++ b/src/langbot/pkg/box/server.py @@ -117,6 +117,15 @@ async def handle_delete_session(request: web.Request) -> web.Response: return _error_response(exc) +async def handle_get_session(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + try: + return web.json_response(runtime.get_session(session_id)) + except BoxError as exc: + return _error_response(exc) + + async def handle_status(request: web.Request) -> web.Response: runtime: BoxRuntime = request.app['runtime'] try: @@ -234,6 +243,7 @@ def create_app(runtime: BoxRuntime | None = None) -> web.Application: app.router.add_post('/v1/sessions/{session_id}/exec', handle_exec) app.router.add_post('/v1/sessions/{session_id}', handle_create_session) + app.router.add_get('/v1/sessions/{session_id}', handle_get_session) app.router.add_get('/v1/sessions', handle_get_sessions) app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session) app.router.add_post('/v1/sessions/{session_id}/managed-process', handle_start_managed_process) diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 05cd3e6b..c239ded3 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -31,6 +31,17 @@ class MCPSessionStatus(enum.Enum): ERROR = 'error' +class MCPSessionErrorPhase(enum.Enum): + """Which phase of the MCP lifecycle failed.""" + SESSION_CREATE = 'session_create' + DEP_INSTALL = 'dep_install' + PROCESS_START = 'process_start' + RELAY_CONNECT = 'relay_connect' + MCP_INIT = 'mcp_init' + RUNTIME = 'runtime' + TOOL_CALL = 'tool_call' + + _VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'}) _VENV_BIN_DIRS = frozenset({'bin', 'Scripts'}) @@ -82,6 +93,10 @@ class RuntimeMCPSession: error_message: str | None = None + error_phase: MCPSessionErrorPhase | None = None + + retry_count: int = 0 + def __init__(self, server_name: str, server_config: dict, enable: bool, ap: app.Application): self.server_name = server_name self.server_uuid = server_config.get('uuid', '') @@ -129,13 +144,17 @@ class RuntimeMCPSession: host_path = self._resolve_host_path() session_payload = self._build_box_session_payload(session_id, host_path) - # MCP server paths are admin-configured, skip host_mount_roots validation - await box_service.create_session( - session_payload, - skip_host_mount_validation=True, - ) + # Phase: session creation + try: + await box_service.create_session( + session_payload, + skip_host_mount_validation=True, + ) + except Exception as e: + self.error_phase = MCPSessionErrorPhase.SESSION_CREATE + raise - # Install dependencies inside the container before starting the MCP server + # Phase: dependency installation if host_path: install_cmd = self._detect_install_command(host_path) if install_cmd: @@ -143,31 +162,50 @@ class RuntimeMCPSession: f'MCP server {self.server_name}: installing dependencies in Box ' f'with: {install_cmd}' ) - # Build an exec spec that matches the existing session config - # to pass the compatibility check. exec_payload = dict(session_payload) exec_payload['cmd'] = install_cmd exec_payload['timeout_sec'] = self.box_config.startup_timeout_sec or 120 - result = await box_service.client.execute( - box_service.build_spec(exec_payload, skip_host_mount_validation=True) - ) + try: + result = await box_service.client.execute( + box_service.build_spec(exec_payload, skip_host_mount_validation=True) + ) + except Exception as e: + self.error_phase = MCPSessionErrorPhase.DEP_INSTALL + raise if not result.ok: + self.error_phase = MCPSessionErrorPhase.DEP_INSTALL stderr_preview = (result.stderr or '')[:500] raise Exception( f'Dependency install failed (exit code {result.exit_code}): ' f'{stderr_preview}' ) - await box_service.start_managed_process( - session_id, - self._build_box_process_payload(host_path), - ) + # Phase: managed process start + try: + await box_service.start_managed_process( + session_id, + self._build_box_process_payload(host_path), + ) + except Exception as e: + self.error_phase = MCPSessionErrorPhase.PROCESS_START + raise - websocket_url = box_service.get_managed_process_websocket_url(session_id) - transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) - read_stream, write_stream = transport - self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) - await self.session.initialize() + # Phase: WebSocket relay connection + try: + websocket_url = box_service.get_managed_process_websocket_url(session_id) + transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url)) + read_stream, write_stream = transport + self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream)) + except Exception as e: + self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT + raise + + # Phase: MCP protocol initialization + try: + await self.session.initialize() + except Exception as e: + self.error_phase = MCPSessionErrorPhase.MCP_INIT + raise async def _init_sse_server(self): sse_transport = await self.exit_stack.enter_async_context( @@ -237,6 +275,7 @@ class RuntimeMCPSession: task.cancel() for task in done: if task is monitor_task and not self._shutdown_event.is_set(): + self.error_phase = MCPSessionErrorPhase.RUNTIME raise Exception('Box managed process exited unexpectedly') else: await self._shutdown_event.wait() @@ -269,6 +308,7 @@ class RuntimeMCPSession: await self._lifecycle_loop() return # Normal shutdown, don't retry except Exception as e: + self.retry_count = attempt + 1 if self._shutdown_event.is_set(): return # Shutdown requested, don't retry if attempt >= self._MAX_RETRIES: @@ -285,6 +325,7 @@ class RuntimeMCPSession: # Reset status for retry self.status = MCPSessionStatus.CONNECTING self.error_message = None + self.error_phase = None await asyncio.sleep(delay) async def _monitor_box_process_health(self): @@ -379,6 +420,8 @@ class RuntimeMCPSession: info = { 'status': self.status.value, 'error_message': self.error_message, + 'error_phase': self.error_phase.value if self.error_phase else None, + 'retry_count': self.retry_count, 'tool_count': len(self.get_tools()), 'tools': [ { diff --git a/tests/integration_tests/box/test_box_mcp_integration.py b/tests/integration_tests/box/test_box_mcp_integration.py new file mode 100644 index 00000000..b984e74d --- /dev/null +++ b/tests/integration_tests/box/test_box_mcp_integration.py @@ -0,0 +1,313 @@ +"""Integration tests for Box MCP-related features. + +These tests verify managed process lifecycle, WebSocket stdio attach, +session cleanup, and the single-session query API using a real container +runtime. + +CI only runs ``tests/unit_tests/``, so these tests never execute in the +CI pipeline. Run them locally with:: + + pytest tests/integration_tests/box/test_box_mcp_integration.py -v +""" + +from __future__ import annotations + +import asyncio +import logging +import shutil +import socket +import subprocess + +import aiohttp +import pytest +from aiohttp.test_utils import TestServer + +from langbot.pkg.box.client import RemoteBoxRuntimeClient +from langbot.pkg.box.errors import BoxSessionNotFoundError +from langbot.pkg.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec +from langbot.pkg.box.runtime import BoxRuntime +from langbot.pkg.box.server import create_app as create_server_app + +_logger = logging.getLogger('test.box.mcp_integration') + +_TEST_IMAGE = 'alpine:latest' + + +# ── Skip helpers ────────────────────────────────────────────────────── + + +def _has_container_runtime() -> bool: + for cmd in ('podman', 'docker'): + if shutil.which(cmd) is None: + continue + try: + result = subprocess.run([cmd, 'info'], capture_output=True, timeout=10) + if result.returncode == 0: + return True + except Exception: + continue + return False + + +def _can_open_test_socket() -> bool: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except OSError: + return False + sock.close() + return True + + +requires_container = pytest.mark.skipif( + not _has_container_runtime(), + reason='no container runtime (podman/docker) available', +) + +requires_socket = pytest.mark.skipif( + not _can_open_test_socket(), + reason='local test environment does not permit opening TCP sockets', +) + + +# ── Fixtures ────────────────────────────────────────────────────────── + + +@pytest.fixture +async def box_server(): + """Yield a (TestServer, RemoteBoxRuntimeClient) backed by a real BoxRuntime.""" + runtime = BoxRuntime(logger=_logger) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + client = RemoteBoxRuntimeClient( + base_url=str(server.make_url('')), + logger=_logger, + ) + yield server, client + await client.shutdown() + await server.close() + + +# ── 1. Managed process lifecycle ───────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_managed_process_start_and_query(box_server): + """Start a managed process and query its status.""" + server, client = box_server + + # Create session + spec = BoxSpec( + cmd='', + session_id='mcp-int-lifecycle', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a managed process that stays alive + proc_spec = BoxManagedProcessSpec( + command='sh', + args=['-c', 'while true; do sleep 1; done'], + cwd='/tmp', + ) + info = await client.start_managed_process('mcp-int-lifecycle', proc_spec) + assert info.status == BoxManagedProcessStatus.RUNNING + + # Query it + info2 = await client.get_managed_process('mcp-int-lifecycle') + assert info2.status == BoxManagedProcessStatus.RUNNING + assert info2.command == 'sh' + + # Cleanup + await client.delete_session('mcp-int-lifecycle') + + +# ── 2. WebSocket stdio attach ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_ws_stdio_attach_echo(box_server): + """Attach to a managed process via WebSocket and verify bidirectional IO.""" + server, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-ws', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a cat process (echoes stdin to stdout) + proc_spec = BoxManagedProcessSpec( + command='cat', + args=[], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-ws', proc_spec) + + # Connect via WebSocket + ws_url = client.get_managed_process_websocket_url('mcp-int-ws') + session = aiohttp.ClientSession() + try: + async with session.ws_connect(ws_url) as ws: + # Send a line + await ws.send_str('hello from test') + + # Expect to receive it back (cat echoes) + msg = await asyncio.wait_for(ws.receive(), timeout=5) + assert msg.type == aiohttp.WSMsgType.TEXT + assert 'hello from test' in msg.data + finally: + await session.close() + + await client.delete_session('mcp-int-ws') + + +# ── 3. Session cleanup removes container ───────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_delete_session_cleans_up(box_server): + """After deleting a session, it should no longer exist.""" + server, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-cleanup', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a process + proc_spec = BoxManagedProcessSpec( + command='sleep', + args=['3600'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-cleanup', proc_spec) + + # Delete + await client.delete_session('mcp-int-cleanup') + + # Session should be gone + with pytest.raises(BoxSessionNotFoundError): + await client.get_session('mcp-int-cleanup') + + +# ── 4. GET /v1/sessions/{id} ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_get_session_returns_details(box_server): + """GET single session returns session details and managed process info.""" + server, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-get', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Query without managed process + info = await client.get_session('mcp-int-get') + assert info['session_id'] == 'mcp-int-get' + assert info['image'] == _TEST_IMAGE + assert 'managed_process' not in info + + # Start a process and query again + proc_spec = BoxManagedProcessSpec( + command='sleep', + args=['3600'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-get', proc_spec) + + info2 = await client.get_session('mcp-int-get') + assert info2['session_id'] == 'mcp-int-get' + assert 'managed_process' in info2 + assert info2['managed_process']['status'] == BoxManagedProcessStatus.RUNNING.value + + await client.delete_session('mcp-int-get') + + +# ── 5. Process exit detected ──────────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_process_exit_detected(box_server): + """When a managed process exits, its status should reflect EXITED.""" + server, client = box_server + + spec = BoxSpec( + cmd='', + session_id='mcp-int-exit', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Start a process that exits immediately + proc_spec = BoxManagedProcessSpec( + command='sh', + args=['-c', 'echo done && exit 0'], + cwd='/tmp', + ) + await client.start_managed_process('mcp-int-exit', proc_spec) + + # Wait a bit for process to exit + await asyncio.sleep(2) + + info = await client.get_managed_process('mcp-int-exit') + assert info.status == BoxManagedProcessStatus.EXITED + assert info.exit_code == 0 + + await client.delete_session('mcp-int-exit') + + +# ── 6. Instance ID orphan cleanup ─────────────────────────────────── + + +@requires_container +@requires_socket +@pytest.mark.asyncio +async def test_orphan_cleanup_preserves_own_containers(box_server): + """Orphan cleanup should not remove containers belonging to the current instance.""" + server, client = box_server + + # Create a session (container gets current instance ID label) + spec = BoxSpec( + cmd='', + session_id='mcp-int-orphan', + workdir='/tmp', + image=_TEST_IMAGE, + ) + await client.create_session(spec) + + # Verify session exists + sessions = await client.get_sessions() + assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) + + # Trigger status check (which doesn't clean up own containers) + status = await client.get_status() + assert status['active_sessions'] >= 1 + + # Our session should still exist + sessions = await client.get_sessions() + assert any(s['session_id'] == 'mcp-int-orphan' for s in sessions) + + await client.delete_session('mcp-int-orphan') diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index 5653d927..61f6530e 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -67,6 +67,9 @@ class _InProcessBoxRuntimeClient(BoxRuntimeClient): async def get_managed_process(self, session_id: str): return self._runtime.get_managed_process(session_id) + async def get_session(self, session_id: str): + return self._runtime.get_session(session_id) + def _can_open_test_socket() -> bool: try: