refactor(box): introduce reusable workspace session helper

This commit is contained in:
youhuanghe
2026-04-08 12:16:03 +00:00
committed by WangCham
parent 51fcf26571
commit a8eb6e6984
7 changed files with 644 additions and 258 deletions
@@ -1,13 +1,20 @@
from __future__ import annotations
import enum
import os
import asyncio
from typing import TYPE_CHECKING, Any
import pydantic
from mcp import ClientSession
from mcp.client.websocket import websocket_client
from ....box.workspace import (
BoxWorkspaceSession,
classify_python_workspace,
infer_workspace_host_path,
rewrite_mounted_path,
rewrite_venv_command,
unwrap_venv_path,
)
if TYPE_CHECKING:
from .mcp import RuntimeMCPSession
@@ -25,10 +32,6 @@ class MCPSessionErrorPhase(enum.Enum):
TOOL_CALL = 'tool_call'
_VENV_DIRS = frozenset({'.venv', 'venv', 'env', '.env'})
_VENV_BIN_DIRS = frozenset({'bin', 'Scripts'})
class MCPServerBoxConfig(pydantic.BaseModel):
"""Structured configuration for running an MCP server inside a Box container."""
@@ -65,6 +68,21 @@ class BoxStdioSessionRuntime:
def server_config(self) -> dict:
return self.owner.server_config
def _build_workspace(self) -> BoxWorkspaceSession:
return BoxWorkspaceSession(
self.ap.box_service,
self.owner._build_box_session_id(),
host_path=self.resolve_host_path(),
host_path_mode=self.config.host_path_mode,
env=self.config.env,
network=self.config.network,
read_only_rootfs=self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False,
image=self.config.image,
cpus=self.config.cpus,
memory_mb=self.config.memory_mb,
pids_limit=self.config.pids_limit,
)
def uses_box_stdio(self) -> bool:
if self.server_config.get('mode') != 'stdio':
return False
@@ -74,13 +92,11 @@ class BoxStdioSessionRuntime:
return False
async def initialize(self) -> None:
box_service = self.ap.box_service
session_id = self.owner._build_box_session_id()
host_path = self.resolve_host_path()
session_payload = self.build_box_session_payload(session_id, host_path)
workspace = self._build_workspace()
host_path = workspace.host_path
try:
await box_service.create_session(session_payload)
await workspace.create_session()
except Exception:
self.owner.error_phase = MCPSessionErrorPhase.SESSION_CREATE
raise
@@ -89,11 +105,11 @@ class BoxStdioSessionRuntime:
install_cmd = self.owner._detect_install_command(host_path)
if install_cmd:
self.ap.logger.info(f'MCP server {self.server_name}: installing dependencies in Box with: {install_cmd}')
exec_payload = dict(session_payload)
exec_payload['cmd'] = install_cmd
exec_payload['timeout_sec'] = self.config.startup_timeout_sec or 120
try:
result = await box_service.client.execute(box_service.build_spec(exec_payload))
result = await workspace.execute_raw(
install_cmd,
timeout_sec=self.config.startup_timeout_sec or 120,
)
except Exception:
self.owner.error_phase = MCPSessionErrorPhase.DEP_INSTALL
raise
@@ -103,13 +119,17 @@ class BoxStdioSessionRuntime:
raise Exception(f'Dependency install failed (exit code {result.exit_code}): {stderr_preview}')
try:
await box_service.start_managed_process(session_id, self.build_box_process_payload(host_path))
await workspace.start_managed_process(
self.server_config['command'],
self.server_config.get('args', []),
env=self.server_config.get('env', {}),
)
except Exception:
self.owner.error_phase = MCPSessionErrorPhase.PROCESS_START
raise
try:
websocket_url = box_service.get_managed_process_websocket_url(session_id)
websocket_url = workspace.get_managed_process_websocket_url()
transport = await self.owner.exit_stack.enter_async_context(websocket_client(websocket_url))
read_stream, write_stream = transport
self.owner.session = await self.owner.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))
@@ -126,11 +146,11 @@ class BoxStdioSessionRuntime:
async def monitor_process_health(self) -> None:
from langbot_plugin.box.models import BoxManagedProcessStatus
session_id = self.owner._build_box_session_id()
workspace = self._build_workspace()
consecutive_errors = 0
while not self.owner._shutdown_event.is_set():
try:
info = await self.ap.box_service.get_managed_process(session_id)
info = await workspace.get_managed_process()
if isinstance(info, dict):
status = info.get('status', '')
else:
@@ -154,120 +174,58 @@ class BoxStdioSessionRuntime:
return
try:
await self.ap.box_service.client.delete_session(self.owner._build_box_session_id())
await self._build_workspace().cleanup()
except Exception as exc:
self.ap.logger.warning(f'Failed to cleanup Box session for MCP server {self.server_name}: {exc}')
def rewrite_path(self, path: str, host_path: str | None) -> str:
if not host_path or not path:
return path
normalized_host = os.path.realpath(host_path)
if path.startswith(normalized_host + '/'):
return '/workspace' + path[len(normalized_host) :]
if path == normalized_host:
return '/workspace'
return path
return rewrite_mounted_path(path, host_path)
def infer_host_path(self) -> str | None:
candidates = []
parts = [self.server_config.get('command', '')] + self.server_config.get('args', [])
for part in parts:
if not os.path.isabs(part):
continue
if os.path.exists(part):
directory = os.path.dirname(part)
directory = self.unwrap_venv_path(directory)
candidates.append(os.path.realpath(directory))
if not candidates:
return None
common = os.path.commonpath(candidates)
return common if common != '/' else None
return infer_workspace_host_path(self.server_config.get('command', ''), self.server_config.get('args', []))
@staticmethod
def unwrap_venv_path(directory: str) -> str:
parts = directory.replace('\\', '/').split('/')
for i in range(len(parts) - 1, 0, -1):
if parts[i] in _VENV_BIN_DIRS and i >= 1:
venv_dir = parts[i - 1]
if venv_dir in _VENV_DIRS:
project_root = '/'.join(parts[: i - 1])
return project_root if project_root else '/'
return directory
return unwrap_venv_path(directory)
def resolve_host_path(self) -> str | None:
return self.config.host_path or self.infer_host_path()
@staticmethod
def detect_install_command(host_path: str) -> str | None:
copy_and_install = (
'mkdir -p /opt/_mcp_src'
' && tar -C /workspace'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_mcp_src -xf -'
' && pip install --no-cache-dir /opt/_mcp_src'
' && rm -rf /opt/_mcp_src'
)
install_requirements = 'pip install --no-cache-dir -r /workspace/requirements.txt'
if os.path.isfile(os.path.join(host_path, 'pyproject.toml')):
return copy_and_install
if os.path.isfile(os.path.join(host_path, 'setup.py')):
return copy_and_install
if os.path.isfile(os.path.join(host_path, 'requirements.txt')):
return install_requirements
workspace_kind = classify_python_workspace(host_path)
if workspace_kind == 'package':
return (
'mkdir -p /opt/_lb_src'
' && tar -C /workspace'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_lb_src -xf -'
' && pip install --no-cache-dir /opt/_lb_src'
' && rm -rf /opt/_lb_src'
)
if workspace_kind == 'requirements':
return 'pip install --no-cache-dir -r /workspace/requirements.txt'
return None
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:
if host_path is None:
host_path = self.resolve_host_path()
payload: dict[str, Any] = {
'session_id': session_id,
'workdir': '/workspace',
'env': self.config.env,
'network': self.config.network,
'read_only_rootfs': self.config.read_only_rootfs if self.config.read_only_rootfs is not None else False,
}
if host_path:
payload['host_path'] = host_path
payload['host_path_mode'] = self.config.host_path_mode
for key in ('image', 'cpus', 'memory_mb', 'pids_limit'):
value = getattr(self.config, key)
if value is not None:
payload[key] = value if not isinstance(value, enum.Enum) else value.value
return payload
workspace = self._build_workspace()
workspace.session_id = session_id
if host_path is not None:
workspace.host_path = host_path
return workspace.build_session_payload()
def build_box_process_payload(self, host_path: str | None = None) -> dict[str, Any]:
if host_path is None:
host_path = self.resolve_host_path()
command = self.server_config['command']
args = self.server_config.get('args', [])
cwd = '/workspace'
if host_path:
command = self.rewrite_venv_command(command, host_path)
args = [self.rewrite_path(arg, host_path) for arg in args]
cwd = self.rewrite_path(cwd, host_path)
return {
'command': command,
'args': args,
'env': self.server_config.get('env', {}),
'cwd': cwd,
}
workspace = self._build_workspace()
if host_path is not None:
workspace.host_path = host_path
return workspace.build_process_payload(
self.server_config['command'],
self.server_config.get('args', []),
env=self.server_config.get('env', {}),
)
def rewrite_venv_command(self, command: str, host_path: str) -> str:
if not host_path or not command:
return command
normalized_host = os.path.realpath(host_path)
if not command.startswith(normalized_host + '/'):
return command
rel = command[len(normalized_host) + 1 :]
parts = rel.replace('\\', '/').split('/')
if len(parts) >= 3 and parts[0] in _VENV_DIRS and parts[1] in _VENV_BIN_DIRS and parts[2].startswith('python'):
return 'python'
return self.rewrite_path(command, host_path)
return rewrite_venv_command(command, host_path)
@@ -6,6 +6,7 @@ import os
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from ....box.workspace import BoxWorkspaceSession
from .. import loader
from . import skill as skill_loader
@@ -94,18 +95,19 @@ class NativeToolLoader(loader.ToolLoader):
if skill_loader.should_prepare_skill_python_env(package_root):
rewritten_command = skill_loader.wrap_skill_command_with_python_env(rewritten_command)
spec_payload: dict = {
'cmd': rewritten_command,
'workdir': rewritten_workdir,
'host_path': package_root,
'host_path_mode': 'rw',
'session_id': skill_loader.build_skill_session_id(selected_skill, query),
}
for key in ('timeout_sec', 'env'):
if key in parameters:
spec_payload[key] = parameters[key]
result = await self.ap.box_service.execute_spec_payload(spec_payload, query)
workspace = BoxWorkspaceSession(
self.ap.box_service,
skill_loader.build_skill_session_id(selected_skill, query),
host_path=package_root,
host_path_mode='rw',
)
result = await workspace.execute_for_query(
query,
rewritten_command,
workdir=rewritten_workdir,
timeout_sec=parameters.get('timeout_sec'),
env=parameters.get('env'),
)
self._refresh_skill_from_disk(selected_skill)
return result
+4 -133
View File
@@ -2,9 +2,10 @@ from __future__ import annotations
import os
import re
import textwrap
import typing
from ....box import workspace as box_workspace
if typing.TYPE_CHECKING:
from ....core import app
from langbot_plugin.api.entities.events import pipeline_query
@@ -13,23 +14,6 @@ ACTIVATED_SKILLS_KEY = '_activated_skills'
PIPELINE_BOUND_SKILLS_KEY = '_pipeline_bound_skills'
SKILL_MOUNT_PREFIX = '/workspace/.skills'
_SKILL_MOUNT_PATTERN = re.compile(r'/workspace/\.skills/([A-Za-z0-9_-]+)')
_PYTHON_SKILL_MANIFESTS = (
'requirements.txt',
'pyproject.toml',
'setup.py',
'setup.cfg',
)
def _normalize_host_path(path: str | None) -> str:
if path is None:
return ''
stripped = str(path).strip()
if not stripped:
return ''
return os.path.realpath(os.path.abspath(stripped))
def get_virtual_skill_mount_path(skill_name: str) -> str:
return f'{SKILL_MOUNT_PREFIX}/{skill_name}'
@@ -165,121 +149,8 @@ def build_skill_session_id(skill_data: dict, query: pipeline_query.Query) -> str
def should_prepare_skill_python_env(package_root: str | None) -> bool:
normalized_root = _normalize_host_path(package_root)
if not normalized_root:
return False
if os.path.isdir(os.path.join(normalized_root, '.venv')):
return True
return any(os.path.isfile(os.path.join(normalized_root, filename)) for filename in _PYTHON_SKILL_MANIFESTS)
return box_workspace.should_prepare_python_env(package_root)
def wrap_skill_command_with_python_env(command: str) -> str:
bootstrap = textwrap.dedent(
"""
set -e
_LB_VENV_DIR="/workspace/.venv"
_LB_META_DIR="/workspace/.langbot"
_LB_META_FILE="$_LB_META_DIR/python-env.json"
_LB_LOCK_DIR="$_LB_META_DIR/python-env.lock"
_LB_TMP_DIR="/workspace/.tmp"
_LB_PIP_CACHE_DIR="/workspace/.cache/pip"
mkdir -p "$_LB_META_DIR" "$_LB_TMP_DIR" "$_LB_PIP_CACHE_DIR"
export TMPDIR="$_LB_TMP_DIR"
export TEMP="$_LB_TMP_DIR"
export TMP="$_LB_TMP_DIR"
export PIP_CACHE_DIR="$_LB_PIP_CACHE_DIR"
_lb_python_meta() {
python - <<'PY'
import hashlib
import json
import os
import sys
root = "/workspace"
digest = hashlib.sha256()
manifest_files = []
for rel in ("requirements.txt", "pyproject.toml", "setup.py", "setup.cfg"):
path = os.path.join(root, rel)
if not os.path.isfile(path):
continue
manifest_files.append(rel)
with open(path, "rb") as handle:
digest.update(rel.encode("utf-8"))
digest.update(b"\0")
digest.update(handle.read())
digest.update(b"\0")
print(
json.dumps(
{
"python_executable": sys.executable,
"python_version": list(sys.version_info[:3]),
"manifest_files": manifest_files,
"manifest_sha256": digest.hexdigest(),
},
sort_keys=True,
)
)
PY
}
_LB_CURRENT_META="$(_lb_python_meta)"
_LB_NEEDS_BOOTSTRAP=0
if [ ! -x "$_LB_VENV_DIR/bin/python" ]; then
_LB_NEEDS_BOOTSTRAP=1
elif [ ! -f "$_LB_META_FILE" ]; then
_LB_NEEDS_BOOTSTRAP=1
elif [ "$(cat "$_LB_META_FILE")" != "$_LB_CURRENT_META" ]; then
_LB_NEEDS_BOOTSTRAP=1
fi
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
_LB_LOCK_WAIT=0
while ! mkdir "$_LB_LOCK_DIR" 2>/dev/null; do
if [ "$_LB_LOCK_WAIT" -ge 120 ]; then
echo "Timed out waiting for Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
sleep 1
_LB_LOCK_WAIT=$((_LB_LOCK_WAIT + 1))
done
_lb_cleanup_lock() {
rmdir "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
}
trap _lb_cleanup_lock EXIT INT TERM
_LB_CURRENT_META="$(_lb_python_meta)"
_LB_NEEDS_BOOTSTRAP=0
if [ ! -x "$_LB_VENV_DIR/bin/python" ]; then
_LB_NEEDS_BOOTSTRAP=1
elif [ ! -f "$_LB_META_FILE" ]; then
_LB_NEEDS_BOOTSTRAP=1
elif [ "$(cat "$_LB_META_FILE")" != "$_LB_CURRENT_META" ]; then
_LB_NEEDS_BOOTSTRAP=1
fi
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
rm -rf "$_LB_VENV_DIR"
python -m venv "$_LB_VENV_DIR"
if [ -f /workspace/requirements.txt ]; then
"$_LB_VENV_DIR/bin/python" -m pip install -r /workspace/requirements.txt
elif [ -f /workspace/pyproject.toml ] || [ -f /workspace/setup.py ] || [ -f /workspace/setup.cfg ]; then
"$_LB_VENV_DIR/bin/python" -m pip install -e /workspace
fi
printf '%s' "$_LB_CURRENT_META" > "$_LB_META_FILE"
fi
fi
export VIRTUAL_ENV="$_LB_VENV_DIR"
export PATH="$_LB_VENV_DIR/bin:$PATH"
"""
).strip()
return f'{bootstrap}\n\n{command}'
return box_workspace.wrap_python_command_with_env(command).rstrip()