fix(mcp): stabilize shared box managed processes

This commit is contained in:
Junyan Qin
2026-05-19 00:45:35 +08:00
parent 747ea069aa
commit 257d9d3a65
7 changed files with 187 additions and 27 deletions

View File

@@ -289,6 +289,9 @@ class BoxService:
async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo:
return await self.client.get_managed_process(session_id, process_id) return await self.client.get_managed_process(session_id, process_id)
async def stop_managed_process(self, session_id: str, process_id: str = 'default') -> None:
return await self.client.stop_managed_process(session_id, process_id)
def get_managed_process_websocket_url(self, session_id: str, process_id: str = 'default') -> str: def get_managed_process_websocket_url(self, session_id: str, process_id: str = 'default') -> str:
getter = getattr(self.client, 'get_managed_process_websocket_url', None) getter = getattr(self.client, 'get_managed_process_websocket_url', None)
if getter is None: if getter is None:

View File

@@ -403,6 +403,9 @@ class BoxWorkspaceSession:
async def get_managed_process(self, process_id: str = 'default'): async def get_managed_process(self, process_id: str = 'default'):
return await self.box_service.get_managed_process(self.session_id, process_id) return await self.box_service.get_managed_process(self.session_id, process_id)
async def stop_managed_process(self, process_id: str = 'default') -> None:
await self.box_service.stop_managed_process(self.session_id, process_id)
def get_managed_process_websocket_url(self, process_id: str = 'default') -> str: def get_managed_process_websocket_url(self, process_id: str = 'default') -> str:
return self.box_service.get_managed_process_websocket_url(self.session_id, process_id) return self.box_service.get_managed_process_websocket_url(self.session_id, process_id)

View File

@@ -20,7 +20,7 @@ from ....core import app
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.provider.message as provider_message
from ....entity.persistence import mcp as persistence_mcp from ....entity.persistence import mcp as persistence_mcp
from .mcp_stdio import BoxStdioSessionRuntime, MCPSessionErrorPhase from .mcp_stdio import BoxStdioSessionRuntime, MCPServerBoxConfig, MCPSessionErrorPhase # noqa: F401
class MCPSessionStatus(enum.Enum): class MCPSessionStatus(enum.Enum):
@@ -241,7 +241,7 @@ class RuntimeMCPSession:
self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry()) self._lifecycle_task = asyncio.create_task(self._lifecycle_loop_with_retry())
# Wait for connection or failure (with timeout) # Wait for connection or failure (with timeout)
startup_timeout = self.box_config.startup_timeout_sec if self._uses_box_stdio() else 30.0 startup_timeout = (self.box_config.startup_timeout_sec + 30) if self._uses_box_stdio() else 30.0
try: try:
await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout) await asyncio.wait_for(self._ready_event.wait(), timeout=startup_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:

View File

@@ -2,6 +2,9 @@ from __future__ import annotations
import enum import enum
import asyncio import asyncio
import os
import shutil
import shlex
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import pydantic import pydantic
@@ -11,6 +14,7 @@ from ....box.workspace import (
BoxWorkspaceSession, BoxWorkspaceSession,
classify_python_workspace, classify_python_workspace,
infer_workspace_host_path, infer_workspace_host_path,
normalize_host_path,
rewrite_mounted_path, rewrite_mounted_path,
rewrite_venv_command, rewrite_venv_command,
unwrap_venv_path, unwrap_venv_path,
@@ -68,13 +72,22 @@ class BoxStdioSessionRuntime:
def server_config(self) -> dict: def server_config(self) -> dict:
return self.owner.server_config return self.owner.server_config
def _build_workspace(self) -> BoxWorkspaceSession: def _build_workspace(
self,
*,
host_path: str | None | object = ...,
workdir: str = '/workspace',
mount_path: str = '/workspace',
) -> BoxWorkspaceSession:
resolved_host_path = self.resolve_host_path() if host_path is ... else host_path
return BoxWorkspaceSession( return BoxWorkspaceSession(
self.ap.box_service, self.ap.box_service,
self.owner._build_box_session_id(), self.owner._build_box_session_id(),
host_path=self.resolve_host_path(), host_path=resolved_host_path,
host_path_mode=self.config.host_path_mode, host_path_mode=self.config.host_path_mode,
workdir=workdir,
env=self.config.env, env=self.config.env,
mount_path=mount_path,
network=self.config.network, network=self.config.network,
read_only_rootfs=self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False, read_only_rootfs=self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False,
image=self.config.image, image=self.config.image,
@@ -92,14 +105,17 @@ class BoxStdioSessionRuntime:
def uses_box_stdio(self) -> bool: def uses_box_stdio(self) -> bool:
if self.server_config.get('mode') != 'stdio': if self.server_config.get('mode') != 'stdio':
return False return False
try: return getattr(self.ap, 'box_service', None) is not None
return bool(getattr(self.ap.box_service, 'available', False))
except Exception:
return False
async def initialize(self) -> None: async def initialize(self) -> None:
workspace = self._build_workspace() await self._wait_for_box_runtime()
host_path = workspace.host_path
# All stdio MCP servers share one Box session. Per-server host paths
# are staged into the shared workspace instead of becoming session
# mounts, because an existing Docker container cannot add bind mounts.
workspace = self._build_workspace(host_path=None)
host_path = self.resolve_host_path()
process_cwd = '/workspace'
try: try:
await workspace.create_session() await workspace.create_session()
@@ -108,7 +124,8 @@ class BoxStdioSessionRuntime:
raise raise
if host_path: if host_path:
install_cmd = self.owner._detect_install_command(host_path) process_cwd = await self._stage_host_path_to_shared_workspace(host_path)
install_cmd = self.detect_install_command(host_path, process_cwd)
if install_cmd: if install_cmd:
self.ap.logger.info( self.ap.logger.info(
f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}' f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}'
@@ -116,6 +133,7 @@ class BoxStdioSessionRuntime:
try: try:
result = await workspace.execute_raw( result = await workspace.execute_raw(
install_cmd, install_cmd,
workdir=process_cwd,
timeout_sec=self.config.startup_timeout_sec or 120, timeout_sec=self.config.startup_timeout_sec or 120,
) )
except Exception: except Exception:
@@ -127,12 +145,19 @@ class BoxStdioSessionRuntime:
raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}') raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}')
try: try:
await workspace.start_managed_process( process_workspace = (
self._build_workspace(host_path=host_path, workdir=process_cwd, mount_path=process_cwd)
if host_path
else workspace
)
payload = process_workspace.build_process_payload(
self.server_config['command'], self.server_config['command'],
self.server_config.get('args', []), self.server_config.get('args', []),
process_id=self.process_id,
env=self.server_config.get('env', {}), env=self.server_config.get('env', {}),
cwd=process_cwd,
) )
payload['process_id'] = self.process_id
await workspace.box_service.start_managed_process(workspace.session_id, payload)
except Exception: except Exception:
self.owner.error_phase = MCPSessionErrorPhase.PROCESS_START self.owner.error_phase = MCPSessionErrorPhase.PROCESS_START
raise raise
@@ -180,15 +205,98 @@ class BoxStdioSessionRuntime:
return return
await asyncio.sleep(self.owner._MONITOR_POLL_INTERVAL) await asyncio.sleep(self.owner._MONITOR_POLL_INTERVAL)
async def _stage_host_path_to_shared_workspace(self, host_path: str) -> str:
source_path = normalize_host_path(host_path)
if not source_path:
return '/workspace'
if not os.path.isdir(source_path):
raise FileNotFoundError(f'MCP host_path does not exist or is not a directory: {host_path}')
self._validate_host_path(source_path)
shared_host_path = self._shared_workspace_host_path()
process_host_root = os.path.join(shared_host_path, '.mcp', self.process_id)
process_host_workspace = os.path.join(process_host_root, 'workspace')
await asyncio.to_thread(self._copy_workspace_tree, source_path, process_host_root, process_host_workspace)
return f'/workspace/.mcp/{self.process_id}/workspace'
def _validate_host_path(self, host_path: str) -> None:
self.ap.box_service.build_spec(
{
'session_id': f'mcp-validate-{self.process_id}',
'host_path': host_path,
'host_path_mode': self.config.host_path_mode,
'network': self.config.network,
'read_only_rootfs': self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False,
}
)
def _shared_workspace_host_path(self) -> str:
default_workspace = getattr(self.ap.box_service, 'default_workspace', None)
if not default_workspace:
raise RuntimeError('Box default workspace is required for shared MCP host_path staging')
shared_host_path = normalize_host_path(default_workspace)
os.makedirs(shared_host_path, exist_ok=True)
return shared_host_path
@staticmethod
def _copy_workspace_tree(source_path: str, process_host_root: str, process_host_workspace: str) -> None:
shutil.rmtree(process_host_root, ignore_errors=True)
os.makedirs(process_host_root, exist_ok=True)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '.pytest_cache', '.mypy_cache', '.ruff_cache'),
)
async def _cleanup_staged_workspace(self) -> None:
if not self.resolve_host_path():
return
try:
process_host_root = os.path.join(self._shared_workspace_host_path(), '.mcp', self.process_id)
await asyncio.to_thread(shutil.rmtree, process_host_root, True)
except Exception as exc:
self.ap.logger.warning(
f'MCP server {self.server_name}: failed to clean staged workspace '
f'process_id={self.process_id}: {type(exc).__name__}: {exc}'
)
async def _wait_for_box_runtime(self) -> None:
timeout_sec = max(float(self.config.startup_timeout_sec or 120), 1.0)
deadline = asyncio.get_running_loop().time() + timeout_sec
warned = False
while not getattr(self.ap.box_service, 'available', False):
if not warned:
self.ap.logger.warning(
f'MCP server {self.server_name}: waiting for Box runtime before starting stdio process'
)
warned = True
if asyncio.get_running_loop().time() >= deadline:
self.owner.error_phase = MCPSessionErrorPhase.SESSION_CREATE
raise Exception(f'Box runtime is not available after {int(timeout_sec)} seconds')
await asyncio.sleep(1)
async def cleanup_session(self) -> None: async def cleanup_session(self) -> None:
if not self.uses_box_stdio(): if not self.uses_box_stdio():
return return
# In the shared-session model, we do not delete the session itself. # In the shared-session model, we do not delete the session itself.
# The managed process exits independently; deleting the session would # Stop only this MCP server's managed process; deleting the session
# kill other MCP servers sharing the same container. # would kill other MCP servers sharing the same container.
workspace = self._build_workspace(host_path=None)
try:
await workspace.stop_managed_process(self.process_id)
except Exception as exc:
self.ap.logger.warning(
f'MCP server {self.server_name}: failed to stop managed process '
f'process_id={self.process_id}: {type(exc).__name__}: {exc}'
)
await self._cleanup_staged_workspace()
return
await self._cleanup_staged_workspace()
self.ap.logger.info( self.ap.logger.info(
f'MCP server {self.server_name}: process_id={self.process_id} cleanup complete ' f'MCP server {self.server_name}: stopped process_id={self.process_id} '
f'(shared session {self.owner._build_box_session_id()} kept alive)' f'(shared session {self.owner._build_box_session_id()} kept alive)'
) )
@@ -206,12 +314,13 @@ class BoxStdioSessionRuntime:
return self.config.host_path or self.infer_host_path() return self.config.host_path or self.infer_host_path()
@staticmethod @staticmethod
def detect_install_command(host_path: str) -> str | None: def detect_install_command(host_path: str, workspace_path: str = '/workspace') -> str | None:
workspace_kind = classify_python_workspace(host_path) workspace_kind = classify_python_workspace(host_path)
quoted_workspace_path = shlex.quote(workspace_path)
if workspace_kind == 'package': if workspace_kind == 'package':
return ( return (
'mkdir -p /opt/_lb_src' 'mkdir -p /opt/_lb_src'
' && tar -C /workspace' f' && tar -C {quoted_workspace_path}'
' --exclude=.venv --exclude=.git --exclude=__pycache__' ' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox' ' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache' ' --exclude="*.egg-info" --exclude=.uv-cache'
@@ -221,7 +330,7 @@ class BoxStdioSessionRuntime:
' && rm -rf /opt/_lb_src' ' && rm -rf /opt/_lb_src'
) )
if workspace_kind == 'requirements': if workspace_kind == 'requirements':
return 'pip install --no-cache-dir -r /workspace/requirements.txt' return f'pip install --no-cache-dir -r {quoted_workspace_path}/requirements.txt'
return None return None
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]: def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:

View File

@@ -23,7 +23,7 @@ import pytest
from aiohttp.test_utils import TestServer from aiohttp.test_utils import TestServer
from langbot_plugin.box.client import ActionRPCBoxClient from langbot_plugin.box.client import ActionRPCBoxClient
from langbot_plugin.box.errors import BoxSessionNotFoundError from langbot_plugin.box.errors import BoxManagedProcessNotFoundError, BoxSessionNotFoundError
from langbot_plugin.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec from langbot_plugin.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec
from langbot_plugin.box.runtime import BoxRuntime from langbot_plugin.box.runtime import BoxRuntime
from langbot_plugin.box.server import BoxServerHandler, create_ws_relay_app from langbot_plugin.box.server import BoxServerHandler, create_ws_relay_app
@@ -169,6 +169,13 @@ async def test_managed_process_start_and_query(box_server):
assert info2.status == BoxManagedProcessStatus.RUNNING assert info2.status == BoxManagedProcessStatus.RUNNING
assert info2.command == 'sh' assert info2.command == 'sh'
# Stop only the managed process while keeping the session available
await client.stop_managed_process('mcp-int-lifecycle')
with pytest.raises(BoxManagedProcessNotFoundError):
await client.get_managed_process('mcp-int-lifecycle')
session_info = await client.get_session('mcp-int-lifecycle')
assert session_info['session_id'] == 'mcp-int-lifecycle'
# Cleanup # Cleanup
await client.delete_session('mcp-int-lifecycle') await client.delete_session('mcp-int-lifecycle')

View File

@@ -70,6 +70,9 @@ class _InProcessBoxRuntimeClient(BoxRuntimeClient):
async def get_managed_process(self, session_id: str, process_id: str = 'default'): async def get_managed_process(self, session_id: str, process_id: str = 'default'):
return self._runtime.get_managed_process(session_id, process_id) return self._runtime.get_managed_process(session_id, process_id)
async def stop_managed_process(self, session_id: str, process_id: str = 'default'):
await self._runtime.stop_managed_process(session_id, process_id)
async def get_session(self, session_id: str): async def get_session(self, session_id: str):
return self._runtime.get_session(session_id) return self._runtime.get_session(session_id)

View File

@@ -561,7 +561,7 @@ class TestGetRuntimeInfoDict:
assert info['box_session_id'] == 'mcp-shared' assert info['box_session_id'] == 'mcp-shared'
assert info['box_enabled'] is True assert info['box_enabled'] is True
def test_stdio_session_without_box_runtime(self, mcp_module): def test_stdio_session_waits_for_unavailable_box_runtime(self, mcp_module):
ap = _make_ap() ap = _make_ap()
ap.box_service.available = False ap.box_service.available = False
s = _make_session( s = _make_session(
@@ -576,6 +576,24 @@ class TestGetRuntimeInfoDict:
ap=ap, ap=ap,
) )
info = s.get_runtime_info_dict() info = s.get_runtime_info_dict()
assert info['box_session_id'] == 'mcp-shared'
assert info['box_enabled'] is True
def test_stdio_session_without_box_service_uses_local_stdio(self, mcp_module):
ap = _make_ap()
del ap.box_service
s = _make_session(
mcp_module,
{
'name': 'test',
'uuid': 'test-uuid',
'mode': 'stdio',
'command': 'python',
'args': [],
},
ap=ap,
)
info = s.get_runtime_info_dict()
assert 'box_session_id' not in info assert 'box_session_id' not in info
@@ -616,7 +634,7 @@ class TestBoxConfigParsing:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_module): async def test_init_box_stdio_server_stages_host_path_in_shared_workspace(mcp_module, tmp_path):
mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio'] mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio']
class FakeClientSession: class FakeClientSession:
@@ -641,6 +659,7 @@ async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_mod
ap = _make_ap() ap = _make_ap()
ap.box_service.available = True ap.box_service.available = True
ap.box_service.default_workspace = str(tmp_path / 'shared-box-workspace')
ap.box_service.create_session = AsyncMock(return_value={}) ap.box_service.create_session = AsyncMock(return_value={})
ap.box_service.build_spec = Mock(return_value='validated-spec') ap.box_service.build_spec = Mock(return_value='validated-spec')
ap.box_service.client = SimpleNamespace( ap.box_service.client = SimpleNamespace(
@@ -649,24 +668,40 @@ async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_mod
ap.box_service.start_managed_process = AsyncMock(return_value={}) ap.box_service.start_managed_process = AsyncMock(return_value={})
ap.box_service.get_managed_process_websocket_url = Mock(return_value='ws://box.example/process') ap.box_service.get_managed_process_websocket_url = Mock(return_value='ws://box.example/process')
host_path = tmp_path / 'mcp-source'
host_path.mkdir()
server_file = host_path / 'server.py'
server_file.write_text('print("hello")\n', encoding='utf-8')
session = _make_session( session = _make_session(
mcp_module, mcp_module,
{ {
'name': 'test', 'name': 'test',
'uuid': 'u1', 'uuid': 'u1',
'mode': 'stdio', 'mode': 'stdio',
'command': '/home/user/mcp/.venv/bin/python', 'command': str(host_path / '.venv' / 'bin' / 'python'),
'args': ['/home/user/mcp/server.py'], 'args': [str(server_file)],
'box': {'host_path': '/home/user/mcp'}, 'box': {'host_path': str(host_path)},
}, },
ap=ap, ap=ap,
) )
session._detect_install_command = Mock(return_value='pip install --no-cache-dir -r /workspace/requirements.txt')
await session._init_box_stdio_server() await session._init_box_stdio_server()
await session.exit_stack.aclose() await session.exit_stack.aclose()
assert ap.box_service.create_session.await_count == 1 assert ap.box_service.create_session.await_count == 1
assert ap.box_service.create_session.await_args.kwargs.get('skip_host_mount_validation', False) is False session_payload = ap.box_service.create_session.await_args.args[0]
assert session_payload['session_id'] == 'mcp-shared'
assert 'host_path' not in session_payload
assert ap.box_service.build_spec.call_count == 1 assert ap.box_service.build_spec.call_count == 1
assert ap.box_service.build_spec.call_args.kwargs.get('skip_host_mount_validation', False) is False assert ap.box_service.build_spec.call_args.kwargs.get('skip_host_mount_validation', False) is False
assert ap.box_service.build_spec.call_args.args[0]['host_path'] == str(host_path)
staged_file = tmp_path / 'shared-box-workspace' / '.mcp' / 'u1' / 'workspace' / 'server.py'
assert staged_file.read_text(encoding='utf-8') == 'print("hello")\n'
process_payload = ap.box_service.start_managed_process.await_args.args[1]
assert process_payload['process_id'] == 'u1'
assert process_payload['command'] == 'python'
assert process_payload['args'] == ['/workspace/.mcp/u1/workspace/server.py']
assert process_payload['cwd'] == '/workspace/.mcp/u1/workspace'