mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
refactor(mcp): extract box stdio runtime helper
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import os
|
||||
import typing
|
||||
from contextlib import AsyncExitStack
|
||||
import traceback
|
||||
@@ -10,19 +9,18 @@ import sqlalchemy
|
||||
import asyncio
|
||||
import httpx
|
||||
|
||||
import pydantic
|
||||
import uuid as uuid_module
|
||||
from mcp import ClientSession, StdioServerParameters
|
||||
from mcp.client.stdio import stdio_client
|
||||
from mcp.client.sse import sse_client
|
||||
from mcp.client.streamable_http import streamable_http_client
|
||||
from mcp.client.websocket import websocket_client
|
||||
|
||||
from .. import loader
|
||||
from ....core import app
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
from ....entity.persistence import mcp as persistence_mcp
|
||||
from .mcp_stdio import BoxStdioSessionRuntime, MCPServerBoxConfig, MCPSessionErrorPhase
|
||||
|
||||
|
||||
class MCPSessionStatus(enum.Enum):
|
||||
@@ -31,39 +29,6 @@ class MCPSessionStatus(enum.Enum):
|
||||
ERROR = 'error'
|
||||
|
||||
|
||||
class MCPSessionErrorPhase(enum.Enum):
|
||||
"""Which phase of the MCP lifecycle failed."""
|
||||
|
||||
SESSION_CREATE = 'session_create'
|
||||
DEP_INSTALL = 'dep_install'
|
||||
PROCESS_START = 'process_start'
|
||||
RELAY_CONNECT = 'relay_connect'
|
||||
MCP_INIT = 'mcp_init'
|
||||
RUNTIME = 'runtime'
|
||||
TOOL_CALL = 'tool_call'
|
||||
|
||||
|
||||
_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'})
|
||||
_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'})
|
||||
|
||||
|
||||
class MCPServerBoxConfig(pydantic.BaseModel):
|
||||
"""Structured configuration for running an MCP server inside a Box container."""
|
||||
|
||||
image: str | None = None
|
||||
network: str = 'on' # MCP servers need network for dependency installation
|
||||
host_path: str | None = None
|
||||
host_path_mode: str = 'ro' # MCP servers default to read-only mount
|
||||
env: dict[str, str] = pydantic.Field(default_factory=dict)
|
||||
startup_timeout_sec: int = 120 # Longer default to allow pip install
|
||||
cpus: float | None = None
|
||||
memory_mb: int | None = None
|
||||
pids_limit: int | None = None
|
||||
read_only_rootfs: bool | None = None
|
||||
|
||||
model_config = pydantic.ConfigDict(extra='ignore')
|
||||
|
||||
|
||||
class RuntimeMCPSession:
|
||||
"""运行时 MCP 会话"""
|
||||
|
||||
@@ -98,6 +63,8 @@ class RuntimeMCPSession:
|
||||
|
||||
retry_count: int = 0
|
||||
|
||||
_box_stdio_runtime: BoxStdioSessionRuntime
|
||||
|
||||
def __init__(self, server_name: str, server_config: dict, enable: bool, ap: app.Application):
|
||||
self.server_name = server_name
|
||||
self.server_uuid = server_config.get('uuid', '')
|
||||
@@ -115,12 +82,12 @@ class RuntimeMCPSession:
|
||||
self._shutdown_event = asyncio.Event()
|
||||
self._ready_event = asyncio.Event()
|
||||
|
||||
# Parse box config once
|
||||
self.box_config = MCPServerBoxConfig.model_validate(server_config.get('box', {}))
|
||||
self._box_stdio_runtime = BoxStdioSessionRuntime(self)
|
||||
self.box_config = self._box_stdio_runtime.config
|
||||
|
||||
async def _init_stdio_python_server(self):
|
||||
if self._uses_box_stdio():
|
||||
await self._init_box_stdio_server()
|
||||
await self._box_stdio_runtime.initialize()
|
||||
return
|
||||
|
||||
server_params = StdioServerParameters(
|
||||
@@ -138,66 +105,7 @@ class RuntimeMCPSession:
|
||||
await self.session.initialize()
|
||||
|
||||
async def _init_box_stdio_server(self):
|
||||
box_service = self.ap.box_service
|
||||
session_id = self._build_box_session_id()
|
||||
host_path = self._resolve_host_path()
|
||||
session_payload = self._build_box_session_payload(session_id, host_path)
|
||||
|
||||
# Phase: session creation
|
||||
try:
|
||||
await box_service.create_session(
|
||||
session_payload,
|
||||
)
|
||||
except Exception:
|
||||
self.error_phase = MCPSessionErrorPhase.SESSION_CREATE
|
||||
raise
|
||||
|
||||
# Phase: dependency installation
|
||||
if host_path:
|
||||
install_cmd = self._detect_install_command(host_path)
|
||||
if install_cmd:
|
||||
self.ap.logger.info(
|
||||
f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}'
|
||||
)
|
||||
exec_payload = dict(session_payload)
|
||||
exec_payload['cmd'] = install_cmd
|
||||
exec_payload['timeout_sec'] = self.box_config.startup_timeout_sec or 120
|
||||
try:
|
||||
result = await box_service.client.execute(box_service.build_spec(exec_payload))
|
||||
except Exception:
|
||||
self.error_phase = MCPSessionErrorPhase.DEP_INSTALL
|
||||
raise
|
||||
if not result.ok:
|
||||
self.error_phase = MCPSessionErrorPhase.DEP_INSTALL
|
||||
stderr_preview = (result.stderr or '')[:500]
|
||||
raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}')
|
||||
|
||||
# Phase: managed process start
|
||||
try:
|
||||
await box_service.start_managed_process(
|
||||
session_id,
|
||||
self._build_box_process_payload(host_path),
|
||||
)
|
||||
except Exception:
|
||||
self.error_phase = MCPSessionErrorPhase.PROCESS_START
|
||||
raise
|
||||
|
||||
# Phase: WebSocket relay connection
|
||||
try:
|
||||
websocket_url = box_service.get_managed_process_websocket_url(session_id)
|
||||
transport = await self.exit_stack.enter_async_context(websocket_client(websocket_url))
|
||||
read_stream, write_stream = transport
|
||||
self.session = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
|
||||
except Exception:
|
||||
self.error_phase = MCPSessionErrorPhase.RELAY_CONNECT
|
||||
raise
|
||||
|
||||
# Phase: MCP protocol initialization
|
||||
try:
|
||||
await self.session.initialize()
|
||||
except Exception:
|
||||
self.error_phase = MCPSessionErrorPhase.MCP_INIT
|
||||
raise
|
||||
await self._box_stdio_runtime.initialize()
|
||||
|
||||
async def _init_sse_server(self):
|
||||
sse_transport = await self.exit_stack.enter_async_context(
|
||||
@@ -257,7 +165,7 @@ class RuntimeMCPSession:
|
||||
|
||||
# Wait for shutdown signal, with optional health monitoring for Box stdio
|
||||
if self._uses_box_stdio():
|
||||
monitor_task = asyncio.create_task(self._monitor_box_process_health())
|
||||
monitor_task = asyncio.create_task(self._box_stdio_runtime.monitor_process_health())
|
||||
shutdown_task = asyncio.create_task(self._shutdown_event.wait())
|
||||
done, pending = await asyncio.wait(
|
||||
[shutdown_task, monitor_task],
|
||||
@@ -323,31 +231,7 @@ class RuntimeMCPSession:
|
||||
_MONITOR_MAX_CONSECUTIVE_ERRORS = 3
|
||||
|
||||
async def _monitor_box_process_health(self):
|
||||
"""Poll managed process status; return when process exits."""
|
||||
from langbot_plugin.box.models import BoxManagedProcessStatus
|
||||
|
||||
session_id = self._build_box_session_id()
|
||||
consecutive_errors = 0
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
info = await self.ap.box_service.client.get_managed_process(session_id)
|
||||
if isinstance(info, dict):
|
||||
status = info.get('status', '')
|
||||
else:
|
||||
status = getattr(info, 'status', '')
|
||||
if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED:
|
||||
return
|
||||
consecutive_errors = 0
|
||||
except Exception as exc:
|
||||
consecutive_errors += 1
|
||||
self.ap.logger.warning(
|
||||
f'MCP monitor for {self.server_name}: get_managed_process failed '
|
||||
f'({consecutive_errors}/{self._MONITOR_MAX_CONSECUTIVE_ERRORS}): '
|
||||
f'{type(exc).__name__}: {exc}'
|
||||
)
|
||||
if consecutive_errors >= self._MONITOR_MAX_CONSECUTIVE_ERRORS:
|
||||
return
|
||||
await asyncio.sleep(self._MONITOR_POLL_INTERVAL)
|
||||
await self._box_stdio_runtime.monitor_process_health()
|
||||
|
||||
async def start(self):
|
||||
if not self.enable:
|
||||
@@ -462,180 +346,39 @@ class RuntimeMCPSession:
|
||||
self.ap.logger.error(f'Error shutting down MCP session {self.server_name}: {e}\n{traceback.format_exc()}')
|
||||
|
||||
def _uses_box_stdio(self) -> bool:
|
||||
"""Check whether this stdio MCP server should run inside a Box container.
|
||||
|
||||
Returns True when mode is stdio AND the Box runtime is available.
|
||||
An explicit ``box`` key in server_config is NOT required — if the
|
||||
runtime is reachable, stdio servers default to Box isolation.
|
||||
"""
|
||||
if self.server_config.get('mode') != 'stdio':
|
||||
return False
|
||||
try:
|
||||
return getattr(self.ap.box_service, 'available', False)
|
||||
except Exception:
|
||||
return False
|
||||
return self._box_stdio_runtime.uses_box_stdio()
|
||||
|
||||
def _build_box_session_id(self) -> str:
|
||||
return f'mcp-{self.server_uuid}'
|
||||
|
||||
def _rewrite_path(self, path: str, host_path: str | None) -> str:
|
||||
"""Rewrite host path prefix to container /workspace prefix."""
|
||||
if not host_path or not path:
|
||||
return path
|
||||
normalized_host = os.path.realpath(host_path)
|
||||
if path.startswith(normalized_host + '/'):
|
||||
return '/workspace' + path[len(normalized_host) :]
|
||||
if path == normalized_host:
|
||||
return '/workspace'
|
||||
return path
|
||||
return self._box_stdio_runtime.rewrite_path(path, host_path)
|
||||
|
||||
def _infer_host_path(self) -> str | None:
|
||||
"""Try to infer host_path from command and args absolute paths.
|
||||
|
||||
Detects virtualenv patterns (e.g. .venv/bin/python) and walks up
|
||||
to the project root rather than using the bin directory.
|
||||
"""
|
||||
candidates = []
|
||||
parts = [self.server_config.get('command', '')] + self.server_config.get('args', [])
|
||||
for part in parts:
|
||||
if not os.path.isabs(part):
|
||||
continue
|
||||
# Use the raw path for venv detection (before resolving symlinks)
|
||||
# because .venv/bin/python is often a symlink to the system python.
|
||||
if os.path.exists(part):
|
||||
directory = os.path.dirname(part)
|
||||
directory = self._unwrap_venv_path(directory)
|
||||
candidates.append(os.path.realpath(directory))
|
||||
if not candidates:
|
||||
return None
|
||||
common = os.path.commonpath(candidates)
|
||||
return common if common != '/' else None
|
||||
return self._box_stdio_runtime.infer_host_path()
|
||||
|
||||
@staticmethod
|
||||
def _unwrap_venv_path(directory: str) -> str:
|
||||
"""If directory looks like a virtualenv bin dir, return the project root.
|
||||
|
||||
Recognized patterns:
|
||||
/project/.venv/bin -> /project
|
||||
/project/venv/bin -> /project
|
||||
/project/.venv/Scripts -> /project (Windows)
|
||||
/project/env/bin -> /project
|
||||
"""
|
||||
parts = directory.replace('\\', '/').split('/')
|
||||
# Look for patterns like .../(.venv|venv|env)/(bin|Scripts)
|
||||
for i in range(len(parts) - 1, 0, -1):
|
||||
if parts[i] in _VENV_BIN_DIRS and i >= 1:
|
||||
venv_dir = parts[i - 1]
|
||||
if venv_dir in _VENV_DIRS:
|
||||
# Return everything before the venv directory
|
||||
project_root = '/'.join(parts[: i - 1])
|
||||
return project_root if project_root else '/'
|
||||
return directory
|
||||
return BoxStdioSessionRuntime.unwrap_venv_path(directory)
|
||||
|
||||
def _resolve_host_path(self) -> str | None:
|
||||
"""Resolve the effective host_path: explicit config > inference."""
|
||||
return self.box_config.host_path or self._infer_host_path()
|
||||
return self._box_stdio_runtime.resolve_host_path()
|
||||
|
||||
@staticmethod
|
||||
def _detect_install_command(host_path: str) -> str | None:
|
||||
"""Detect how to install dependencies from the mounted project.
|
||||
|
||||
Copies the project to a writable temp directory before installing,
|
||||
because /workspace may be mounted read-only and pip needs to write
|
||||
build artifacts in the source tree.
|
||||
"""
|
||||
# Use /opt instead of /tmp — /tmp is often a small tmpfs (64 MB)
|
||||
# and cannot hold the copied source tree plus pip build artifacts.
|
||||
_COPY_AND_INSTALL = (
|
||||
'mkdir -p /opt/_mcp_src'
|
||||
' && tar -C /workspace'
|
||||
' --exclude=.venv --exclude=.git --exclude=__pycache__'
|
||||
' --exclude=node_modules --exclude=.tox --exclude=.nox'
|
||||
' --exclude="*.egg-info" --exclude=.uv-cache'
|
||||
' -cf - .'
|
||||
' | tar -C /opt/_mcp_src -xf -'
|
||||
' && pip install --no-cache-dir /opt/_mcp_src'
|
||||
' && rm -rf /opt/_mcp_src'
|
||||
)
|
||||
_INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt'
|
||||
|
||||
if os.path.isfile(os.path.join(host_path, 'pyproject.toml')):
|
||||
return _COPY_AND_INSTALL
|
||||
if os.path.isfile(os.path.join(host_path, 'setup.py')):
|
||||
return _COPY_AND_INSTALL
|
||||
if os.path.isfile(os.path.join(host_path, 'requirements.txt')):
|
||||
return _INSTALL_REQUIREMENTS
|
||||
return None
|
||||
return BoxStdioSessionRuntime.detect_install_command(host_path)
|
||||
|
||||
def _build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict:
|
||||
bc = self.box_config
|
||||
if host_path is None:
|
||||
host_path = self._resolve_host_path()
|
||||
|
||||
payload: dict[str, typing.Any] = {
|
||||
'session_id': session_id,
|
||||
'workdir': '/workspace',
|
||||
'env': bc.env,
|
||||
# MCP sessions need network for dependency install and writable rootfs
|
||||
'network': bc.network,
|
||||
'read_only_rootfs': bc.read_only_rootfs if bc.read_only_rootfs is not None else False,
|
||||
}
|
||||
if host_path:
|
||||
payload['host_path'] = host_path
|
||||
payload['host_path_mode'] = bc.host_path_mode
|
||||
for key in ('image', 'cpus', 'memory_mb', 'pids_limit'):
|
||||
val = getattr(bc, key)
|
||||
if val is not None:
|
||||
payload[key] = val if not isinstance(val, enum.Enum) else val.value
|
||||
return payload
|
||||
return self._box_stdio_runtime.build_box_session_payload(session_id, host_path)
|
||||
|
||||
def _build_box_process_payload(self, host_path: str | None = None) -> dict:
|
||||
if host_path is None:
|
||||
host_path = self._resolve_host_path()
|
||||
|
||||
command = self.server_config['command']
|
||||
args = self.server_config.get('args', [])
|
||||
cwd = '/workspace'
|
||||
|
||||
if host_path:
|
||||
# When host_path is resolved, we install deps in-container rather
|
||||
# than relying on the host venv. Rewrite paths so the container
|
||||
# sees /workspace/... but replace venv python with plain "python".
|
||||
command = self._rewrite_venv_command(command, host_path)
|
||||
args = [self._rewrite_path(a, host_path) for a in args]
|
||||
cwd = self._rewrite_path(cwd, host_path)
|
||||
|
||||
return {
|
||||
'command': command,
|
||||
'args': args,
|
||||
'env': self.server_config.get('env', {}),
|
||||
'cwd': cwd,
|
||||
}
|
||||
return self._box_stdio_runtime.build_box_process_payload(host_path)
|
||||
|
||||
def _rewrite_venv_command(self, command: str, host_path: str) -> str:
|
||||
"""Rewrite command: if it points to a venv python, use plain 'python'."""
|
||||
if not host_path or not command:
|
||||
return command
|
||||
normalized_host = os.path.realpath(host_path)
|
||||
if not command.startswith(normalized_host + '/'):
|
||||
return command
|
||||
# Check if command is a venv python interpreter
|
||||
rel = command[len(normalized_host) + 1 :] # e.g. ".venv/bin/python"
|
||||
parts = rel.replace('\\', '/').split('/')
|
||||
# Match patterns like .venv/bin/python*, venv/bin/python*, etc.
|
||||
if len(parts) >= 3 and parts[0] in _VENV_DIRS and parts[1] in _VENV_BIN_DIRS and parts[2].startswith('python'):
|
||||
return 'python'
|
||||
# Not a venv python — do normal path rewrite
|
||||
return self._rewrite_path(command, host_path)
|
||||
return self._box_stdio_runtime.rewrite_venv_command(command, host_path)
|
||||
|
||||
async def _cleanup_box_stdio_session(self) -> None:
|
||||
if not self._uses_box_stdio():
|
||||
return
|
||||
|
||||
try:
|
||||
await self.ap.box_service.client.delete_session(self._build_box_session_id())
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {e}')
|
||||
await self._box_stdio_runtime.cleanup_session()
|
||||
|
||||
|
||||
# @loader.loader_class('mcp')
|
||||
|
||||
273
src/langbot/pkg/provider/tools/loaders/mcp_stdio.py
Normal file
273
src/langbot/pkg/provider/tools/loaders/mcp_stdio.py
Normal file
@@ -0,0 +1,273 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import os
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pydantic
|
||||
from mcp import ClientSession
|
||||
from mcp.client.websocket import websocket_client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .mcp import RuntimeMCPSession
|
||||
|
||||
|
||||
class MCPSessionErrorPhase(enum.Enum):
|
||||
"""Which phase of the MCP lifecycle failed."""
|
||||
|
||||
SESSION_CREATE = 'session_create'
|
||||
DEP_INSTALL = 'dep_install'
|
||||
PROCESS_START = 'process_start'
|
||||
RELAY_CONNECT = 'relay_connect'
|
||||
MCP_INIT = 'mcp_init'
|
||||
RUNTIME = 'runtime'
|
||||
TOOL_CALL = 'tool_call'
|
||||
|
||||
|
||||
_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'})
|
||||
_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'})
|
||||
|
||||
|
||||
class MCPServerBoxConfig(pydantic.BaseModel):
|
||||
"""Structured configuration for running an MCP server inside a Box container."""
|
||||
|
||||
image: str | None = None
|
||||
network: str = 'on' # MCP servers need network for dependency installation
|
||||
host_path: str | None = None
|
||||
host_path_mode: str = 'ro' # MCP servers default to read-write mount only when explicitly requested
|
||||
env: dict[str, str] = pydantic.Field(default_factory=dict)
|
||||
startup_timeout_sec: int = 120 # Longer default to allow dependency bootstrap
|
||||
cpus: float | None = None
|
||||
memory_mb: int | None = None
|
||||
pids_limit: int | None = None
|
||||
read_only_rootfs: bool | None = None
|
||||
|
||||
model_config = pydantic.ConfigDict(extra='ignore')
|
||||
|
||||
|
||||
class BoxStdioSessionRuntime:
|
||||
"""Encapsulate Box-backed stdio MCP session orchestration."""
|
||||
|
||||
def __init__(self, owner: RuntimeMCPSession):
|
||||
self.owner = owner
|
||||
self.config = MCPServerBoxConfig.model_validate(owner.server_config.get('box', {}))
|
||||
|
||||
@property
|
||||
def ap(self):
|
||||
return self.owner.ap
|
||||
|
||||
@property
|
||||
def server_name(self) -> str:
|
||||
return self.owner.server_name
|
||||
|
||||
@property
|
||||
def server_config(self) -> dict:
|
||||
return self.owner.server_config
|
||||
|
||||
def uses_box_stdio(self) -> bool:
|
||||
if self.server_config.get('mode') != 'stdio':
|
||||
return False
|
||||
try:
|
||||
return bool(getattr(self.ap.box_service, 'available', False))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def initialize(self) -> None:
|
||||
box_service = self.ap.box_service
|
||||
session_id = self.owner._build_box_session_id()
|
||||
host_path = self.resolve_host_path()
|
||||
session_payload = self.build_box_session_payload(session_id, host_path)
|
||||
|
||||
try:
|
||||
await box_service.create_session(session_payload)
|
||||
except Exception:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.SESSION_CREATE
|
||||
raise
|
||||
|
||||
if host_path:
|
||||
install_cmd = self.owner._detect_install_command(host_path)
|
||||
if install_cmd:
|
||||
self.ap.logger.info(f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}')
|
||||
exec_payload = dict(session_payload)
|
||||
exec_payload['cmd'] = install_cmd
|
||||
exec_payload['timeout_sec'] = self.config.startup_timeout_sec or 120
|
||||
try:
|
||||
result = await box_service.client.execute(box_service.build_spec(exec_payload))
|
||||
except Exception:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.DEP_INSTALL
|
||||
raise
|
||||
if not result.ok:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.DEP_INSTALL
|
||||
stderr_preview = (result.stderr or '')[:500]
|
||||
raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}')
|
||||
|
||||
try:
|
||||
await box_service.start_managed_process(session_id, self.build_box_process_payload(host_path))
|
||||
except Exception:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.PROCESS_START
|
||||
raise
|
||||
|
||||
try:
|
||||
websocket_url = box_service.get_managed_process_websocket_url(session_id)
|
||||
transport = await self.owner.exit_stack.enter_async_context(websocket_client(websocket_url))
|
||||
read_stream, write_stream = transport
|
||||
self.owner.session = await self.owner.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
|
||||
except Exception:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.RELAY_CONNECT
|
||||
raise
|
||||
|
||||
try:
|
||||
await self.owner.session.initialize()
|
||||
except Exception:
|
||||
self.owner.error_phase = MCPSessionErrorPhase.MCP_INIT
|
||||
raise
|
||||
|
||||
async def monitor_process_health(self) -> None:
|
||||
from langbot_plugin.box.models import BoxManagedProcessStatus
|
||||
|
||||
session_id = self.owner._build_box_session_id()
|
||||
consecutive_errors = 0
|
||||
while not self.owner._shutdown_event.is_set():
|
||||
try:
|
||||
info = await self.ap.box_service.get_managed_process(session_id)
|
||||
if isinstance(info, dict):
|
||||
status = info.get('status', '')
|
||||
else:
|
||||
status = getattr(info, 'status', '')
|
||||
if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED:
|
||||
return
|
||||
consecutive_errors = 0
|
||||
except Exception as exc:
|
||||
consecutive_errors += 1
|
||||
self.ap.logger.warning(
|
||||
f'MCP monitor for {self.server_name}: get_managed_process failed '
|
||||
f'({consecutive_errors}/{self.owner._MONITOR_MAX_CONSECUTIVE_ERRORS}): '
|
||||
f'{type(exc).__name__}: {exc}'
|
||||
)
|
||||
if consecutive_errors >= self.owner._MONITOR_MAX_CONSECUTIVE_ERRORS:
|
||||
return
|
||||
await asyncio.sleep(self.owner._MONITOR_POLL_INTERVAL)
|
||||
|
||||
async def cleanup_session(self) -> None:
|
||||
if not self.uses_box_stdio():
|
||||
return
|
||||
|
||||
try:
|
||||
await self.ap.box_service.client.delete_session(self.owner._build_box_session_id())
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {exc}')
|
||||
|
||||
def rewrite_path(self, path: str, host_path: str | None) -> str:
|
||||
if not host_path or not path:
|
||||
return path
|
||||
normalized_host = os.path.realpath(host_path)
|
||||
if path.startswith(normalized_host + '/'):
|
||||
return '/workspace' + path[len(normalized_host) :]
|
||||
if path == normalized_host:
|
||||
return '/workspace'
|
||||
return path
|
||||
|
||||
def infer_host_path(self) -> str | None:
|
||||
candidates = []
|
||||
parts = [self.server_config.get('command', '')] + self.server_config.get('args', [])
|
||||
for part in parts:
|
||||
if not os.path.isabs(part):
|
||||
continue
|
||||
if os.path.exists(part):
|
||||
directory = os.path.dirname(part)
|
||||
directory = self.unwrap_venv_path(directory)
|
||||
candidates.append(os.path.realpath(directory))
|
||||
if not candidates:
|
||||
return None
|
||||
common = os.path.commonpath(candidates)
|
||||
return common if common != '/' else None
|
||||
|
||||
@staticmethod
|
||||
def unwrap_venv_path(directory: str) -> str:
|
||||
parts = directory.replace('\\', '/').split('/')
|
||||
for i in range(len(parts) - 1, 0, -1):
|
||||
if parts[i] in _VENV_BIN_DIRS and i >= 1:
|
||||
venv_dir = parts[i - 1]
|
||||
if venv_dir in _VENV_DIRS:
|
||||
project_root = '/'.join(parts[: i - 1])
|
||||
return project_root if project_root else '/'
|
||||
return directory
|
||||
|
||||
def resolve_host_path(self) -> str | None:
|
||||
return self.config.host_path or self.infer_host_path()
|
||||
|
||||
@staticmethod
|
||||
def detect_install_command(host_path: str) -> str | None:
|
||||
copy_and_install = (
|
||||
'mkdir -p /opt/_mcp_src'
|
||||
' && tar -C /workspace'
|
||||
' --exclude=.venv --exclude=.git --exclude=__pycache__'
|
||||
' --exclude=node_modules --exclude=.tox --exclude=.nox'
|
||||
' --exclude="*.egg-info" --exclude=.uv-cache'
|
||||
' -cf - .'
|
||||
' | tar -C /opt/_mcp_src -xf -'
|
||||
' && pip install --no-cache-dir /opt/_mcp_src'
|
||||
' && rm -rf /opt/_mcp_src'
|
||||
)
|
||||
install_requirements = 'pip install --no-cache-dir -r /workspace/requirements.txt'
|
||||
|
||||
if os.path.isfile(os.path.join(host_path, 'pyproject.toml')):
|
||||
return copy_and_install
|
||||
if os.path.isfile(os.path.join(host_path, 'setup.py')):
|
||||
return copy_and_install
|
||||
if os.path.isfile(os.path.join(host_path, 'requirements.txt')):
|
||||
return install_requirements
|
||||
return None
|
||||
|
||||
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:
|
||||
if host_path is None:
|
||||
host_path = self.resolve_host_path()
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
'session_id': session_id,
|
||||
'workdir': '/workspace',
|
||||
'env': self.config.env,
|
||||
'network': self.config.network,
|
||||
'read_only_rootfs': self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False,
|
||||
}
|
||||
if host_path:
|
||||
payload['host_path'] = host_path
|
||||
payload['host_path_mode'] = self.config.host_path_mode
|
||||
for key in ('image', 'cpus', 'memory_mb', 'pids_limit'):
|
||||
value = getattr(self.config, key)
|
||||
if value is not None:
|
||||
payload[key] = value if not isinstance(value, enum.Enum) else value.value
|
||||
return payload
|
||||
|
||||
def build_box_process_payload(self, host_path: str | None = None) -> dict[str, Any]:
|
||||
if host_path is None:
|
||||
host_path = self.resolve_host_path()
|
||||
|
||||
command = self.server_config['command']
|
||||
args = self.server_config.get('args', [])
|
||||
cwd = '/workspace'
|
||||
|
||||
if host_path:
|
||||
command = self.rewrite_venv_command(command, host_path)
|
||||
args = [self.rewrite_path(arg, host_path) for arg in args]
|
||||
cwd = self.rewrite_path(cwd, host_path)
|
||||
|
||||
return {
|
||||
'command': command,
|
||||
'args': args,
|
||||
'env': self.server_config.get('env', {}),
|
||||
'cwd': cwd,
|
||||
}
|
||||
|
||||
def rewrite_venv_command(self, command: str, host_path: str) -> str:
|
||||
if not host_path or not command:
|
||||
return command
|
||||
normalized_host = os.path.realpath(host_path)
|
||||
if not command.startswith(normalized_host + '/'):
|
||||
return command
|
||||
rel = command[len(normalized_host) + 1 :]
|
||||
parts = rel.replace('\\', '/').split('/')
|
||||
if len(parts) >= 3 and parts[0] in _VENV_DIRS and parts[1] in _VENV_BIN_DIRS and parts[2].startswith('python'):
|
||||
return 'python'
|
||||
return self.rewrite_path(command, host_path)
|
||||
@@ -125,6 +125,7 @@ def mcp_module():
|
||||
'mcp.py',
|
||||
)
|
||||
mcp_path = os.path.normpath(mcp_path)
|
||||
sys.modules['langbot.pkg.provider.tools.loaders'].__path__ = [os.path.dirname(mcp_path)]
|
||||
spec = importlib.util.spec_from_file_location(mod_fqn, mcp_path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[mod_fqn] = mod
|
||||
@@ -134,6 +135,7 @@ def mcp_module():
|
||||
|
||||
# Cleanup
|
||||
sys.modules.pop(mod_fqn, None)
|
||||
sys.modules.pop('langbot.pkg.provider.tools.loaders.mcp_stdio', None)
|
||||
for name in reversed(list(saved)):
|
||||
if saved[name] is None:
|
||||
sys.modules.pop(name, None)
|
||||
@@ -582,6 +584,8 @@ class TestBoxConfigParsing:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_module):
|
||||
mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio']
|
||||
|
||||
class FakeClientSession:
|
||||
def __init__(self, *_args):
|
||||
pass
|
||||
@@ -599,8 +603,8 @@ async def test_init_box_stdio_server_keeps_host_mount_validation_enabled(mcp_mod
|
||||
async def fake_websocket_client(_url: str):
|
||||
yield ('read-stream', 'write-stream')
|
||||
|
||||
mcp_module.ClientSession = FakeClientSession
|
||||
mcp_module.websocket_client = fake_websocket_client
|
||||
mcp_stdio_module.ClientSession = FakeClientSession
|
||||
mcp_stdio_module.websocket_client = fake_websocket_client
|
||||
|
||||
ap = _make_ap()
|
||||
ap.box_service.available = True
|
||||
|
||||
Reference in New Issue
Block a user