diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 31c57e14..e4fad7b2 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -289,6 +289,9 @@ class BoxService: async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: return await self.client.get_managed_process(session_id, process_id) + async def stop_managed_process(self, session_id: str, process_id: str = 'default') -> None: + return await self.client.stop_managed_process(session_id, process_id) + def get_managed_process_websocket_url(self, session_id: str, process_id: str = 'default') -> str: getter = getattr(self.client, 'get_managed_process_websocket_url', None) if getter is None: diff --git a/src/langbot/pkg/box/workspace.py b/src/langbot/pkg/box/workspace.py index 38c52662..948622ef 100644 --- a/src/langbot/pkg/box/workspace.py +++ b/src/langbot/pkg/box/workspace.py @@ -403,6 +403,9 @@ class BoxWorkspaceSession: async def get_managed_process(self, process_id: str = 'default'): return await self.box_service.get_managed_process(self.session_id, process_id) + async def stop_managed_process(self, process_id: str = 'default') -> None: + await self.box_service.stop_managed_process(self.session_id, process_id) + def get_managed_process_websocket_url(self, process_id: str = 'default') -> str: return self.box_service.get_managed_process_websocket_url(self.session_id, process_id) diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index 31bca2e5..0e5c07c2 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -20,7 +20,7 @@ from ....core import app import langbot_plugin.api.entities.builtin.resource.tool as resource_tool import langbot_plugin.api.entities.builtin.provider.message as provider_message from ....entity.persistence import mcp as persistence_mcp -from .mcp_stdio import BoxStdioSessionRuntime, MCPSessionErrorPhase +from .mcp_stdio import BoxStdioSessionRuntime, MCPServerBoxConfig, MCPSessionErrorPhase # noqa: F401 class MCPSessionStatus(enum.Enum): @@ -241,7 +241,7 @@ class RuntimeMCPSession: self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry()) # Wait for connection or failure (with timeout) - startup_timeout = self.box_config.startup_timeout_sec if self._uses_box_stdio() else 30.0 + startup_timeout = (self.box_config.startup_timeout_sec + 30) if self._uses_box_stdio() else 30.0 try: await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout) except asyncio.TimeoutError: diff --git a/src/langbot/pkg/provider/tools/loaders/mcp_stdio.py b/src/langbot/pkg/provider/tools/loaders/mcp_stdio.py index 47a9c30b..95082e15 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp_stdio.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp_stdio.py @@ -2,6 +2,9 @@ from __future__ import annotations import enum import asyncio +import os +import shutil +import shlex from typing import TYPE_CHECKING, Any import pydantic @@ -11,6 +14,7 @@ from ....box.workspace import ( BoxWorkspaceSession, classify_python_workspace, infer_workspace_host_path, + normalize_host_path, rewrite_mounted_path, rewrite_venv_command, unwrap_venv_path, @@ -68,13 +72,22 @@ class BoxStdioSessionRuntime: def server_config(self) -> dict: return self.owner.server_config - def _build_workspace(self) -> BoxWorkspaceSession: + def _build_workspace( + self, + *, + host_path: str | None | object = ..., + workdir: str = '/workspace', + mount_path: str = '/workspace', + ) -> BoxWorkspaceSession: + resolved_host_path = self.resolve_host_path() if host_path is ... else host_path return BoxWorkspaceSession( self.ap.box_service, self.owner._build_box_session_id(), - host_path=self.resolve_host_path(), + host_path=resolved_host_path, host_path_mode=self.config.host_path_mode, + workdir=workdir, env=self.config.env, + mount_path=mount_path, network=self.config.network, read_only_rootfs=self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False, image=self.config.image, @@ -92,14 +105,17 @@ class BoxStdioSessionRuntime: def uses_box_stdio(self) -> bool: if self.server_config.get('mode') != 'stdio': return False - try: - return bool(getattr(self.ap.box_service, 'available', False)) - except Exception: - return False + return getattr(self.ap, 'box_service', None) is not None async def initialize(self) -> None: - workspace = self._build_workspace() - host_path = workspace.host_path + await self._wait_for_box_runtime() + + # All stdio MCP servers share one Box session. Per-server host paths + # are staged into the shared workspace instead of becoming session + # mounts, because an existing Docker container cannot add bind mounts. + workspace = self._build_workspace(host_path=None) + host_path = self.resolve_host_path() + process_cwd = '/workspace' try: await workspace.create_session() @@ -108,7 +124,8 @@ class BoxStdioSessionRuntime: raise if host_path: - install_cmd = self.owner._detect_install_command(host_path) + process_cwd = await self._stage_host_path_to_shared_workspace(host_path) + install_cmd = self.detect_install_command(host_path, process_cwd) if install_cmd: self.ap.logger.info( f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}' @@ -116,6 +133,7 @@ class BoxStdioSessionRuntime: try: result = await workspace.execute_raw( install_cmd, + workdir=process_cwd, timeout_sec=self.config.startup_timeout_sec or 120, ) except Exception: @@ -127,12 +145,19 @@ class BoxStdioSessionRuntime: raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}') try: - await workspace.start_managed_process( + process_workspace = ( + self._build_workspace(host_path=host_path, workdir=process_cwd, mount_path=process_cwd) + if host_path + else workspace + ) + payload = process_workspace.build_process_payload( self.server_config['command'], self.server_config.get('args', []), - process_id=self.process_id, env=self.server_config.get('env', {}), + cwd=process_cwd, ) + payload['process_id'] = self.process_id + await workspace.box_service.start_managed_process(workspace.session_id, payload) except Exception: self.owner.error_phase = MCPSessionErrorPhase.PROCESS_START raise @@ -180,15 +205,98 @@ class BoxStdioSessionRuntime: return await asyncio.sleep(self.owner._MONITOR_POLL_INTERVAL) + async def _stage_host_path_to_shared_workspace(self, host_path: str) -> str: + source_path = normalize_host_path(host_path) + if not source_path: + return '/workspace' + if not os.path.isdir(source_path): + raise FileNotFoundError(f'MCP host_path does not exist or is not a directory: {host_path}') + + self._validate_host_path(source_path) + + shared_host_path = self._shared_workspace_host_path() + process_host_root = os.path.join(shared_host_path, '.mcp', self.process_id) + process_host_workspace = os.path.join(process_host_root, 'workspace') + await asyncio.to_thread(self._copy_workspace_tree, source_path, process_host_root, process_host_workspace) + return f'/workspace/.mcp/{self.process_id}/workspace' + + def _validate_host_path(self, host_path: str) -> None: + self.ap.box_service.build_spec( + { + 'session_id': f'mcp-validate-{self.process_id}', + 'host_path': host_path, + 'host_path_mode': self.config.host_path_mode, + 'network': self.config.network, + 'read_only_rootfs': self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False, + } + ) + + def _shared_workspace_host_path(self) -> str: + default_workspace = getattr(self.ap.box_service, 'default_workspace', None) + if not default_workspace: + raise RuntimeError('Box default workspace is required for shared MCP host_path staging') + shared_host_path = normalize_host_path(default_workspace) + os.makedirs(shared_host_path, exist_ok=True) + return shared_host_path + + @staticmethod + def _copy_workspace_tree(source_path: str, process_host_root: str, process_host_workspace: str) -> None: + shutil.rmtree(process_host_root, ignore_errors=True) + os.makedirs(process_host_root, exist_ok=True) + shutil.copytree( + source_path, + process_host_workspace, + symlinks=True, + ignore=shutil.ignore_patterns('.git', '__pycache__', '.pytest_cache', '.mypy_cache', '.ruff_cache'), + ) + + async def _cleanup_staged_workspace(self) -> None: + if not self.resolve_host_path(): + return + try: + process_host_root = os.path.join(self._shared_workspace_host_path(), '.mcp', self.process_id) + await asyncio.to_thread(shutil.rmtree, process_host_root, True) + except Exception as exc: + self.ap.logger.warning( + f'MCP server {self.server_name}: failed to clean staged workspace ' + f'process_id={self.process_id}: {type(exc).__name__}: {exc}' + ) + + async def _wait_for_box_runtime(self) -> None: + timeout_sec = max(float(self.config.startup_timeout_sec or 120), 1.0) + deadline = asyncio.get_running_loop().time() + timeout_sec + warned = False + while not getattr(self.ap.box_service, 'available', False): + if not warned: + self.ap.logger.warning( + f'MCP server {self.server_name}: waiting for Box runtime before starting stdio process' + ) + warned = True + if asyncio.get_running_loop().time() >= deadline: + self.owner.error_phase = MCPSessionErrorPhase.SESSION_CREATE + raise Exception(f'Box runtime is not available after {int(timeout_sec)} seconds') + await asyncio.sleep(1) + async def cleanup_session(self) -> None: if not self.uses_box_stdio(): return # In the shared-session model, we do not delete the session itself. - # The managed process exits independently; deleting the session would - # kill other MCP servers sharing the same container. + # Stop only this MCP server's managed process; deleting the session + # would kill other MCP servers sharing the same container. + workspace = self._build_workspace(host_path=None) + try: + await workspace.stop_managed_process(self.process_id) + except Exception as exc: + self.ap.logger.warning( + f'MCP server {self.server_name}: failed to stop managed process ' + f'process_id={self.process_id}: {type(exc).__name__}: {exc}' + ) + await self._cleanup_staged_workspace() + return + await self._cleanup_staged_workspace() self.ap.logger.info( - f'MCP server {self.server_name}: process_id={self.process_id} cleanup complete ' + f'MCP server {self.server_name}: stopped process_id={self.process_id} ' f'(shared session {self.owner._build_box_session_id()} kept alive)' ) @@ -206,12 +314,13 @@ class BoxStdioSessionRuntime: return self.config.host_path or self.infer_host_path() @staticmethod - def detect_install_command(host_path: str) -> str | None: + def detect_install_command(host_path: str, workspace_path: str = '/workspace') -> str | None: workspace_kind = classify_python_workspace(host_path) + quoted_workspace_path = shlex.quote(workspace_path) if workspace_kind == 'package': return ( 'mkdir -p /opt/_lb_src' - ' && tar -C /workspace' + f' && tar -C {quoted_workspace_path}' ' --exclude=.venv --exclude=.git --exclude=__pycache__' ' --exclude=node_modules --exclude=.tox --exclude=.nox' ' --exclude="*.egg-info" --exclude=.uv-cache' @@ -221,7 +330,7 @@ class BoxStdioSessionRuntime: ' && rm -rf /opt/_lb_src' ) if workspace_kind == 'requirements': - return 'pip install --no-cache-dir -r /workspace/requirements.txt' + return f'pip install --no-cache-dir -r {quoted_workspace_path}/requirements.txt' return None def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]: diff --git a/tests/integration_tests/box/test_box_mcp_integration.py b/tests/integration_tests/box/test_box_mcp_integration.py index 6140a3c7..2fcfcb93 100644 --- a/tests/integration_tests/box/test_box_mcp_integration.py +++ b/tests/integration_tests/box/test_box_mcp_integration.py @@ -23,7 +23,7 @@ import pytest from aiohttp.test_utils import TestServer from langbot_plugin.box.client import ActionRPCBoxClient -from langbot_plugin.box.errors import BoxSessionNotFoundError +from langbot_plugin.box.errors import BoxManagedProcessNotFoundError, BoxSessionNotFoundError from langbot_plugin.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec from langbot_plugin.box.runtime import BoxRuntime from langbot_plugin.box.server import BoxServerHandler, create_ws_relay_app @@ -169,6 +169,13 @@ async def test_managed_process_start_and_query(box_server): assert info2.status == BoxManagedProcessStatus.RUNNING assert info2.command == 'sh' + # Stop only the managed process while keeping the session available + await client.stop_managed_process('mcp-int-lifecycle') + with pytest.raises(BoxManagedProcessNotFoundError): + await client.get_managed_process('mcp-int-lifecycle') + session_info = await client.get_session('mcp-int-lifecycle') + assert session_info['session_id'] == 'mcp-int-lifecycle' + # Cleanup await client.delete_session('mcp-int-lifecycle') diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index b9a115ee..36bda025 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -70,6 +70,9 @@ class _InProcessBoxRuntimeClient(BoxRuntimeClient): async def get_managed_process(self, session_id: str, process_id: str = 'default'): return self._runtime.get_managed_process(session_id, process_id) + async def stop_managed_process(self, session_id: str, process_id: str = 'default'): + await self._runtime.stop_managed_process(session_id, process_id) + async def get_session(self, session_id: str): return self._runtime.get_session(session_id) diff --git a/tests/unit_tests/provider/test_mcp_box_integration.py b/tests/unit_tests/provider/test_mcp_box_integration.py index df079518..31a7c15b 100644 --- a/tests/unit_tests/provider/test_mcp_box_integration.py +++ b/tests/unit_tests/provider/test_mcp_box_integration.py @@ -561,7 +561,7 @@ class TestGetRuntimeInfoDict: assert info['box_session_id'] == 'mcp-shared' assert info['box_enabled'] is True - def test_stdio_session_without_box_runtime(self, mcp_module): + def test_stdio_session_waits_for_unavailable_box_runtime(self, mcp_module): ap = _make_ap() ap.box_service.available = False s = _make_session( @@ -576,6 +576,24 @@ class TestGetRuntimeInfoDict: ap=ap, ) info = s.get_runtime_info_dict() + assert info['box_session_id'] == 'mcp-shared' + assert info['box_enabled'] is True + + def test_stdio_session_without_box_service_uses_local_stdio(self, mcp_module): + ap = _make_ap() + del ap.box_service + s = _make_session( + mcp_module, + { + 'name': 'test', + 'uuid': 'test-uuid', + 'mode': 'stdio', + 'command': 'python', + 'args': [], + }, + ap=ap, + ) + info = s.get_runtime_info_dict() assert 'box_session_id' not in info @@ -616,7 +634,7 @@ class TestBoxConfigParsing: @pytest.mark.asyncio -async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_module): +async def test_init_box_stdio_server_stages_host_path_in_shared_workspace(mcp_module, tmp_path): mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio'] class FakeClientSession: @@ -641,6 +659,7 @@ async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_mod ap = _make_ap() ap.box_service.available = True + ap.box_service.default_workspace = str(tmp_path / 'shared-box-workspace') ap.box_service.create_session = AsyncMock(return_value={}) ap.box_service.build_spec = Mock(return_value='validated-spec') ap.box_service.client = SimpleNamespace( @@ -649,24 +668,40 @@ async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_mod ap.box_service.start_managed_process = AsyncMock(return_value={}) ap.box_service.get_managed_process_websocket_url = Mock(return_value='ws://box.example/process') + host_path = tmp_path / 'mcp-source' + host_path.mkdir() + server_file = host_path / 'server.py' + server_file.write_text('print("hello")\n', encoding='utf-8') + session = _make_session( mcp_module, { 'name': 'test', 'uuid': 'u1', 'mode': 'stdio', - 'command': '/home/user/mcp/.venv/bin/python', - 'args': ['/home/user/mcp/server.py'], - 'box': {'host_path': '/home/user/mcp'}, + 'command': str(host_path / '.venv' / 'bin' / 'python'), + 'args': [str(server_file)], + 'box': {'host_path': str(host_path)}, }, ap=ap, ) - session._detect_install_command = Mock(return_value='pip install --no-cache-dir -r /workspace/requirements.txt') await session._init_box_stdio_server() await session.exit_stack.aclose() assert ap.box_service.create_session.await_count == 1 - assert ap.box_service.create_session.await_args.kwargs.get('skip_host_mount_validation', False) is False + session_payload = ap.box_service.create_session.await_args.args[0] + assert session_payload['session_id'] == 'mcp-shared' + assert 'host_path' not in session_payload assert ap.box_service.build_spec.call_count == 1 assert ap.box_service.build_spec.call_args.kwargs.get('skip_host_mount_validation', False) is False + assert ap.box_service.build_spec.call_args.args[0]['host_path'] == str(host_path) + + staged_file = tmp_path / 'shared-box-workspace' / '.mcp' / 'u1' / 'workspace' / 'server.py' + assert staged_file.read_text(encoding='utf-8') == 'print("hello")\n' + + process_payload = ap.box_service.start_managed_process.await_args.args[1] + assert process_payload['process_id'] == 'u1' + assert process_payload['command'] == 'python' + assert process_payload['args'] == ['/workspace/.mcp/u1/workspace/server.py'] + assert process_payload['cwd'] == '/workspace/.mcp/u1/workspace'