refactor(box): move box runtime to langbot-plugin-sdk

Extract self-contained box runtime modules (actions, backend, client,
  errors, models, runtime, security, server) to langbot-plugin-sdk and
  update all imports to use `langbot_plugin.box.*`. Keep only service
  and
  connector in LangBot core as they depend on the Application context.

  - Update docker-compose to use `langbot_plugin.box.server` entry
  point
  - Update pyproject.toml to use local SDK via `tool.uv.sources`
  - Remove migrated source files and their unit/integration tests
  - Update remaining test imports to match new module paths
This commit is contained in:
youhuanghe
2026-03-22 07:24:47 +00:00
committed by WangCham
parent c095e830c7
commit b64a23f9ac
19 changed files with 42 additions and 1824 deletions

View File

@@ -7,7 +7,7 @@ services:
langbot_box_runtime:
image: rockchin/langbot:latest
container_name: langbot_box_runtime
command: ["uv", "run", "--no-sync", "-m", "langbot.pkg.box.server"]
command: ["uv", "run", "--no-sync", "-m", "langbot_plugin.box.server"]
volumes:
# Mount the container runtime socket from the host.
# Uncomment the one that matches your container runtime:

View File

@@ -1,21 +0,0 @@
"""Box-specific action types for the action RPC protocol."""
from __future__ import annotations
from langbot_plugin.entities.io.actions.enums import ActionType
class LangBotToBoxAction(ActionType):
"""Actions sent from LangBot to the Box runtime."""
HEALTH = 'box_health'
STATUS = 'box_status'
EXEC = 'box_exec'
CREATE_SESSION = 'box_create_session'
GET_SESSION = 'box_get_session'
GET_SESSIONS = 'box_get_sessions'
DELETE_SESSION = 'box_delete_session'
START_MANAGED_PROCESS = 'box_start_managed_process'
GET_MANAGED_PROCESS = 'box_get_managed_process'
GET_BACKEND_INFO = 'box_get_backend_info'
SHUTDOWN = 'box_shutdown'

View File

@@ -1,388 +0,0 @@
from __future__ import annotations
import abc
import asyncio
import dataclasses
import datetime as dt
import logging
import re
import shlex
import shutil
import uuid
from .errors import BoxError
from .models import (
DEFAULT_BOX_MOUNT_PATH,
BoxExecutionResult,
BoxExecutionStatus,
BoxHostMountMode,
BoxNetworkMode,
BoxSessionInfo,
BoxSpec,
)
from .security import validate_sandbox_security
# Hard cap on raw subprocess output to prevent unbounded memory usage.
# Container timeout already bounds duration, but fast commands can still
# produce large output within the time limit. After this many bytes the
# remaining output is discarded before decoding.
_MAX_RAW_OUTPUT_BYTES = 1_048_576 # 1 MB per stream
@dataclasses.dataclass(slots=True)
class _CommandResult:
return_code: int
stdout: str
stderr: str
timed_out: bool = False
class BaseSandboxBackend(abc.ABC):
name: str
instance_id: str = ''
def __init__(self, logger: logging.Logger):
self.logger = logger
async def initialize(self):
return None
@abc.abstractmethod
async def is_available(self) -> bool:
pass
@abc.abstractmethod
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
pass
@abc.abstractmethod
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
pass
@abc.abstractmethod
async def stop_session(self, session: BoxSessionInfo):
pass
async def start_managed_process(self, session: BoxSessionInfo, spec):
raise BoxError(f'{self.name} backend does not support managed processes')
async def cleanup_orphaned_containers(self, current_instance_id: str = ''):
"""Remove lingering containers from previous runs. No-op by default."""
pass
class CLISandboxBackend(BaseSandboxBackend):
command: str
def __init__(self, logger: logging.Logger, command: str, backend_name: str):
super().__init__(logger)
self.command = command
self.name = backend_name
async def is_available(self) -> bool:
if shutil.which(self.command) is None:
return False
result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False)
return result.return_code == 0 and not result.timed_out
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
validate_sandbox_security(spec)
now = dt.datetime.now(dt.UTC)
container_name = self._build_container_name(spec.session_id)
args = [
self.command,
'run',
'-d',
'--rm',
'--name',
container_name,
'--label',
'langbot.box=true',
'--label',
f'langbot.session_id={spec.session_id}',
'--label',
f'langbot.box.instance_id={self.instance_id}',
]
if spec.network == BoxNetworkMode.OFF:
args.extend(['--network', 'none'])
# Resource limits
args.extend(['--cpus', str(spec.cpus)])
args.extend(['--memory', f'{spec.memory_mb}m'])
args.extend(['--pids-limit', str(spec.pids_limit)])
if spec.read_only_rootfs:
args.append('--read-only')
args.extend(['--tmpfs', '/tmp:size=64m'])
if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE:
mount_spec = f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}:{spec.host_path_mode.value}'
args.extend(['-v', mount_spec])
args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done'])
self.logger.info(
f'LangBot Box backend start_session: backend={self.name} '
f'session_id={spec.session_id} container_name={container_name} '
f'image={spec.image} network={spec.network.value} '
f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} '
f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} '
f'read_only_rootfs={spec.read_only_rootfs}'
)
await self._run_command(args, timeout_sec=30, check=True)
return BoxSessionInfo(
session_id=spec.session_id,
backend_name=self.name,
backend_session_id=container_name,
image=spec.image,
network=spec.network,
host_path=spec.host_path,
host_path_mode=spec.host_path_mode,
cpus=spec.cpus,
memory_mb=spec.memory_mb,
pids_limit=spec.pids_limit,
read_only_rootfs=spec.read_only_rootfs,
created_at=now,
last_used_at=now,
)
async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult:
start = dt.datetime.now(dt.UTC)
args = [self.command, 'exec']
for key, value in spec.env.items():
args.extend(['-e', f'{key}={value}'])
args.extend(
[
session.backend_session_id,
'sh',
'-lc',
self._build_exec_command(spec.workdir, spec.cmd),
]
)
cmd_preview = spec.cmd.strip()
if len(cmd_preview) > 400:
cmd_preview = f'{cmd_preview[:397]}...'
self.logger.info(
f'LangBot Box backend exec: backend={self.name} '
f'session_id={session.session_id} container_name={session.backend_session_id} '
f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} '
f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}'
)
result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False)
duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000)
if result.timed_out:
return BoxExecutionResult(
session_id=session.session_id,
backend_name=self.name,
status=BoxExecutionStatus.TIMED_OUT,
exit_code=None,
stdout=result.stdout,
stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.',
duration_ms=duration_ms,
)
return BoxExecutionResult(
session_id=session.session_id,
backend_name=self.name,
status=BoxExecutionStatus.COMPLETED,
exit_code=result.return_code,
stdout=result.stdout,
stderr=result.stderr,
duration_ms=duration_ms,
)
async def stop_session(self, session: BoxSessionInfo):
self.logger.info(
f'LangBot Box backend stop_session: backend={self.name} '
f'session_id={session.session_id} container_name={session.backend_session_id}'
)
await self._run_command(
[self.command, 'rm', '-f', session.backend_session_id],
timeout_sec=20,
check=False,
)
async def cleanup_orphaned_containers(self, current_instance_id: str = ''):
"""Remove langbot.box containers from previous instances.
Only removes containers whose ``langbot.box.instance_id`` label does
NOT match *current_instance_id*. Containers without the label (from
older versions) are also removed.
"""
result = await self._run_command(
[
self.command,
'ps',
'-a',
'--filter',
'label=langbot.box=true',
'--format',
'{{.ID}}\t{{.Label "langbot.box.instance_id"}}',
],
timeout_sec=10,
check=False,
)
if result.return_code != 0 or not result.stdout.strip():
return
orphan_ids = []
for line in result.stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
parts = line.split('\t', 1)
cid = parts[0].strip()
label_instance = parts[1].strip() if len(parts) > 1 else ''
if label_instance != current_instance_id:
orphan_ids.append(cid)
if not orphan_ids:
return
for cid in orphan_ids:
self.logger.info(f'Cleaning up orphaned Box container: {cid}')
await self._run_command(
[self.command, 'rm', '-f', *orphan_ids],
timeout_sec=30,
check=False,
)
async def start_managed_process(self, session: BoxSessionInfo, spec) -> asyncio.subprocess.Process:
args = [self.command, 'exec', '-i']
for key, value in spec.env.items():
args.extend(['-e', f'{key}={value}'])
args.extend(
[
session.backend_session_id,
'sh',
'-lc',
self._build_spawn_command(spec.cwd, spec.command, spec.args),
]
)
self.logger.info(
f'LangBot Box backend start_managed_process: backend={self.name} '
f'session_id={session.session_id} container_name={session.backend_session_id} '
f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} command={spec.command} args={spec.args}'
)
return await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
def _build_container_name(self, session_id: str) -> str:
normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session'
suffix = uuid.uuid4().hex[:8]
return f'langbot-box-{normalized[:32]}-{suffix}'
def _build_exec_command(self, workdir: str, cmd: str) -> str:
quoted_workdir = shlex.quote(workdir)
return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}'
def _build_spawn_command(self, cwd: str, command: str, args: list[str]) -> str:
quoted_cwd = shlex.quote(cwd)
command_parts = [shlex.quote(command), *[shlex.quote(arg) for arg in args]]
return f'mkdir -p {quoted_cwd} && cd {quoted_cwd} && exec {" ".join(command_parts)}'
async def _run_command(
self,
args: list[str],
timeout_sec: int,
check: bool,
) -> _CommandResult:
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_task = asyncio.create_task(self._read_stream(process.stdout))
stderr_task = asyncio.create_task(self._read_stream(process.stderr))
timed_out = False
try:
await asyncio.wait_for(process.wait(), timeout=timeout_sec)
except asyncio.TimeoutError:
process.kill()
timed_out = True
await process.wait()
stdout_bytes, stdout_total = await stdout_task
stderr_bytes, stderr_total = await stderr_task
if timed_out:
return _CommandResult(
return_code=-1,
stdout=self._clip_captured_bytes(stdout_bytes, stdout_total),
stderr=self._clip_captured_bytes(stderr_bytes, stderr_total),
timed_out=True,
)
stdout = self._clip_captured_bytes(stdout_bytes, stdout_total)
stderr = self._clip_captured_bytes(stderr_bytes, stderr_total)
if check and process.returncode != 0:
raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error'))
return _CommandResult(
return_code=process.returncode,
stdout=stdout,
stderr=stderr,
timed_out=False,
)
@staticmethod
def _clip_captured_bytes(data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES) -> str:
text = data.decode('utf-8', errors='replace').strip()
if total_size > limit:
text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]'
return text
@staticmethod
async def _read_stream(
stream: asyncio.StreamReader | None,
limit: int = _MAX_RAW_OUTPUT_BYTES,
) -> tuple[bytes, int]:
if stream is None:
return b'', 0
chunks = bytearray()
total_size = 0
while True:
chunk = await stream.read(65536)
if not chunk:
break
total_size += len(chunk)
remaining = limit - len(chunks)
if remaining > 0:
chunks.extend(chunk[:remaining])
return bytes(chunks), total_size
def _format_cli_error(self, message: str) -> str:
message = ' '.join(message.split())
if len(message) > 300:
message = f'{message[:297]}...'
return f'{self.name} backend error: {message}'
class PodmanBackend(CLISandboxBackend):
def __init__(self, logger: logging.Logger):
super().__init__(logger=logger, command='podman', backend_name='podman')
class DockerBackend(CLISandboxBackend):
def __init__(self, logger: logging.Logger):
super().__init__(logger=logger, command='docker', backend_name='docker')

View File

@@ -1,193 +0,0 @@
"""BoxRuntimeClient abstraction for Box Runtime access."""
from __future__ import annotations
import abc
import logging
from typing import Any, TYPE_CHECKING
from langbot_plugin.runtime.io.handler import Handler
from .actions import LangBotToBoxAction
from .errors import BoxError, BoxRuntimeUnavailableError
from .models import (
BoxExecutionResult,
BoxExecutionStatus,
BoxManagedProcessInfo,
BoxManagedProcessSpec,
BoxSpec,
get_box_config,
)
from ..utils import platform
if TYPE_CHECKING:
from ..core import app as core_app
def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str:
"""Derive the ws relay base URL used for managed-process attach."""
runtime_url = str(get_box_config(ap).get('runtime_url', '')).strip()
if runtime_url:
return runtime_url
if platform.get_platform() == 'docker':
return 'http://langbot_box_runtime:5410'
return 'http://127.0.0.1:5410'
class BoxRuntimeClient(abc.ABC):
"""Abstract interface that BoxService uses to talk to a Box Runtime."""
@abc.abstractmethod
async def initialize(self) -> None: ...
@abc.abstractmethod
async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ...
@abc.abstractmethod
async def shutdown(self) -> None: ...
@abc.abstractmethod
async def get_status(self) -> dict: ...
@abc.abstractmethod
async def get_sessions(self) -> list[dict]: ...
@abc.abstractmethod
async def get_backend_info(self) -> dict: ...
@abc.abstractmethod
async def delete_session(self, session_id: str) -> None: ...
@abc.abstractmethod
async def create_session(self, spec: BoxSpec) -> dict: ...
@abc.abstractmethod
async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ...
@abc.abstractmethod
async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ...
@abc.abstractmethod
async def get_session(self, session_id: str) -> dict: ...
def _translate_action_error(exc: Exception) -> BoxError:
"""Convert an ActionCallError message back into the appropriate BoxError subclass."""
from .errors import (
BoxBackendUnavailableError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
msg = str(exc)
_ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [
('BoxValidationError:', BoxValidationError),
('BoxSessionNotFoundError:', BoxSessionNotFoundError),
('BoxSessionConflictError:', BoxSessionConflictError),
('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError),
('BoxManagedProcessConflictError:', BoxManagedProcessConflictError),
('BoxBackendUnavailableError:', BoxBackendUnavailableError),
]
for prefix, cls in _ERROR_PREFIX_MAP:
if prefix in msg:
return cls(msg)
return BoxError(msg)
class ActionRPCBoxClient(BoxRuntimeClient):
"""Client that talks to BoxRuntime via the action RPC protocol."""
def __init__(self, logger: logging.Logger):
self._logger = logger
self._handler: Handler | None = None
@property
def handler(self) -> Handler:
if self._handler is None:
raise BoxRuntimeUnavailableError('box runtime not connected')
return self._handler
def set_handler(self, handler: Handler) -> None:
self._handler = handler
async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]:
try:
return await self.handler.call_action(action, data, timeout=timeout)
except BoxRuntimeUnavailableError:
raise
except Exception as exc:
raise _translate_action_error(exc) from exc
async def initialize(self) -> None:
try:
await self._call(LangBotToBoxAction.HEALTH, {})
self._logger.info('LangBot Box runtime connected via action RPC.')
except Exception as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0)
return BoxExecutionResult(
session_id=data['session_id'],
backend_name=data['backend_name'],
status=BoxExecutionStatus(data['status']),
exit_code=data.get('exit_code'),
stdout=data.get('stdout', ''),
stderr=data.get('stderr', ''),
duration_ms=data['duration_ms'],
)
async def shutdown(self) -> None:
if self._handler is not None:
try:
await self._call(LangBotToBoxAction.SHUTDOWN, {})
except Exception:
pass
self._handler = None
async def get_status(self) -> dict:
return await self._call(LangBotToBoxAction.STATUS, {})
async def get_sessions(self) -> list[dict]:
data = await self._call(LangBotToBoxAction.GET_SESSIONS, {})
return data['sessions']
async def get_session(self, session_id: str) -> dict:
return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id})
async def get_backend_info(self) -> dict:
return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {})
async def delete_session(self, session_id: str) -> None:
await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id})
async def create_session(self, spec: BoxSpec) -> dict:
return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json'))
async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo:
data = await self._call(
LangBotToBoxAction.START_MANAGED_PROCESS,
{'session_id': session_id, 'spec': spec.model_dump(mode='json')},
)
return BoxManagedProcessInfo.model_validate(data)
async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo:
data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id})
return BoxManagedProcessInfo.model_validate(data)
def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str:
base = ws_relay_base_url
if base.startswith('https://'):
scheme = 'wss://'
suffix = base[len('https://') :]
elif base.startswith('http://'):
scheme = 'ws://'
suffix = base[len('http://') :]
else:
scheme = 'ws://'
suffix = base
return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws'

View File

@@ -10,15 +10,32 @@ from langbot_plugin.entities.io.actions.enums import CommonAction
from langbot_plugin.runtime.io.handler import Handler
from langbot_plugin.runtime.io.connection import Connection
from .client import ActionRPCBoxClient, resolve_box_ws_relay_url
from .errors import BoxRuntimeUnavailableError
from .models import get_box_config
from langbot_plugin.box.client import ActionRPCBoxClient
from langbot_plugin.box.errors import BoxRuntimeUnavailableError
from ..utils import platform
if TYPE_CHECKING:
from ..core import app as core_app
def _get_box_config(ap) -> dict:
"""Return the 'box' section from instance config, with safe fallbacks."""
instance_config = getattr(ap, 'instance_config', None)
config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {}
return config_data.get('box', {})
def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str:
"""Derive the ws relay base URL used for managed-process attach."""
runtime_url = str(_get_box_config(ap).get('runtime_url', '')).strip()
if runtime_url:
return runtime_url
if platform.get_platform() == 'docker':
return 'http://langbot_box_runtime:5410'
return 'http://127.0.0.1:5410'
class BoxRuntimeConnector:
"""Connect to the Box runtime via action RPC (stdio or ws)."""
@@ -80,7 +97,7 @@ class BoxRuntimeConnector:
ctrl = StdioClientController(
command=python_path,
args=['-m', 'langbot.pkg.box.server', '--port', str(self._relay_port)],
args=['-m', 'langbot_plugin.box.server', '--port', str(self._relay_port)],
env=env,
)
self._subprocess = None # StdioClientController manages the subprocess
@@ -140,7 +157,7 @@ class BoxRuntimeConnector:
self._subprocess.terminate()
def _load_configured_runtime_url(self) -> str:
return str(get_box_config(self.ap).get('runtime_url', '')).strip()
return str(_get_box_config(self.ap).get('runtime_url', '')).strip()
def _should_manage_local_runtime(self) -> bool:
return not self.configured_runtime_url and platform.get_platform() != 'docker'

View File

@@ -1,33 +0,0 @@
from __future__ import annotations
class BoxError(RuntimeError):
"""Base error for LangBot Box failures."""
class BoxValidationError(BoxError):
"""Raised when sandbox_exec arguments are invalid."""
class BoxBackendUnavailableError(BoxError):
"""Raised when no supported container backend is available."""
class BoxRuntimeUnavailableError(BoxError):
"""Raised when the standalone Box Runtime service is unavailable."""
class BoxSessionConflictError(BoxError):
"""Raised when an existing session cannot satisfy a new request."""
class BoxSessionNotFoundError(BoxError):
"""Raised when a referenced session does not exist."""
class BoxManagedProcessConflictError(BoxError):
"""Raised when a session already has an active managed process."""
class BoxManagedProcessNotFoundError(BoxError):
"""Raised when a referenced managed process does not exist."""

View File

@@ -1,274 +0,0 @@
from __future__ import annotations
import datetime as dt
import enum
import pydantic
DEFAULT_BOX_IMAGE = 'python:3.11-slim'
DEFAULT_BOX_MOUNT_PATH = '/workspace'
def get_box_config(ap) -> dict:
"""Return the 'box' section from instance config, with safe fallbacks."""
instance_config = getattr(ap, 'instance_config', None)
config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {}
return config_data.get('box', {})
class BoxNetworkMode(str, enum.Enum):
OFF = 'off'
ON = 'on'
class BoxExecutionStatus(str, enum.Enum):
COMPLETED = 'completed'
TIMED_OUT = 'timed_out'
class BoxHostMountMode(str, enum.Enum):
NONE = 'none'
READ_ONLY = 'ro'
READ_WRITE = 'rw'
class BoxManagedProcessStatus(str, enum.Enum):
RUNNING = 'running'
EXITED = 'exited'
class BoxSpec(pydantic.BaseModel):
cmd: str = ''
workdir: str = '/workspace'
timeout_sec: int = 30
network: BoxNetworkMode = BoxNetworkMode.OFF
session_id: str
env: dict[str, str] = pydantic.Field(default_factory=dict)
image: str = DEFAULT_BOX_IMAGE
host_path: str | None = None
host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE
# Resource limits
cpus: float = 1.0
memory_mb: int = 512
pids_limit: int = 128
read_only_rootfs: bool = True
@pydantic.field_validator('cmd')
@classmethod
def validate_cmd(cls, value: str) -> str:
return value.strip()
@pydantic.field_validator('workdir')
@classmethod
def validate_workdir(cls, value: str) -> str:
value = value.strip()
if not value.startswith('/'):
raise ValueError('workdir must be an absolute path inside the sandbox')
return value
@pydantic.field_validator('timeout_sec')
@classmethod
def validate_timeout_sec(cls, value: int) -> int:
if value <= 0:
raise ValueError('timeout_sec must be greater than 0')
return value
@pydantic.field_validator('cpus')
@classmethod
def validate_cpus(cls, value: float) -> float:
if value <= 0:
raise ValueError('cpus must be greater than 0')
return value
@pydantic.field_validator('memory_mb')
@classmethod
def validate_memory_mb(cls, value: int) -> int:
if value < 32:
raise ValueError('memory_mb must be at least 32')
return value
@pydantic.field_validator('pids_limit')
@classmethod
def validate_pids_limit(cls, value: int) -> int:
if value < 1:
raise ValueError('pids_limit must be at least 1')
return value
@pydantic.field_validator('session_id')
@classmethod
def validate_session_id(cls, value: str) -> str:
value = value.strip()
if not value:
raise ValueError('session_id must not be empty')
return value
@pydantic.field_validator('env')
@classmethod
def validate_env(cls, value: dict[str, str]) -> dict[str, str]:
return {str(k): str(v) for k, v in value.items()}
@pydantic.field_validator('host_path')
@classmethod
def validate_host_path(cls, value: str | None) -> str | None:
if value is None:
return None
value = value.strip()
if not value.startswith('/'):
raise ValueError('host_path must be an absolute host path')
return value
@pydantic.model_validator(mode='after')
def validate_host_mount_consistency(self) -> 'BoxSpec':
if self.host_path is None:
return self
if self.host_path_mode == BoxHostMountMode.NONE:
return self
if not self.workdir.startswith(DEFAULT_BOX_MOUNT_PATH):
raise ValueError('workdir must stay under /workspace when host_path is provided')
return self
class BoxProfile(pydantic.BaseModel):
"""Preset sandbox configuration.
Provides default values for BoxSpec fields and optionally locks fields
so that tool-call parameters cannot override them.
"""
name: str
image: str = DEFAULT_BOX_IMAGE
network: BoxNetworkMode = BoxNetworkMode.OFF
timeout_sec: int = 30
host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE
max_timeout_sec: int = 120
# Resource limits
cpus: float = 1.0
memory_mb: int = 512
pids_limit: int = 128
read_only_rootfs: bool = True
locked: frozenset[str] = frozenset()
model_config = pydantic.ConfigDict(frozen=True)
BUILTIN_PROFILES: dict[str, BoxProfile] = {
'default': BoxProfile(
name='default',
network=BoxNetworkMode.OFF,
host_path_mode=BoxHostMountMode.READ_WRITE,
cpus=1.0,
memory_mb=512,
pids_limit=128,
read_only_rootfs=True,
max_timeout_sec=120,
),
'offline_readonly': BoxProfile(
name='offline_readonly',
network=BoxNetworkMode.OFF,
host_path_mode=BoxHostMountMode.READ_ONLY,
cpus=0.5,
memory_mb=256,
pids_limit=64,
read_only_rootfs=True,
max_timeout_sec=60,
locked=frozenset({'network', 'host_path_mode', 'read_only_rootfs'}),
),
'network_basic': BoxProfile(
name='network_basic',
network=BoxNetworkMode.ON,
host_path_mode=BoxHostMountMode.READ_WRITE,
cpus=1.0,
memory_mb=512,
pids_limit=128,
read_only_rootfs=True,
max_timeout_sec=120,
),
'network_extended': BoxProfile(
name='network_extended',
network=BoxNetworkMode.ON,
host_path_mode=BoxHostMountMode.READ_WRITE,
cpus=2.0,
memory_mb=1024,
pids_limit=256,
read_only_rootfs=False,
max_timeout_sec=300,
),
}
class BoxSessionInfo(pydantic.BaseModel):
session_id: str
backend_name: str
backend_session_id: str
image: str
network: BoxNetworkMode
host_path: str | None = None
host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE
cpus: float = 1.0
memory_mb: int = 512
pids_limit: int = 128
read_only_rootfs: bool = True
created_at: dt.datetime
last_used_at: dt.datetime
class BoxManagedProcessSpec(pydantic.BaseModel):
command: str
args: list[str] = pydantic.Field(default_factory=list)
env: dict[str, str] = pydantic.Field(default_factory=dict)
cwd: str = '/workspace'
@pydantic.field_validator('command')
@classmethod
def validate_command(cls, value: str) -> str:
value = value.strip()
if not value:
raise ValueError('command must not be empty')
return value
@pydantic.field_validator('args')
@classmethod
def validate_args(cls, value: list[str]) -> list[str]:
return [str(item) for item in value]
@pydantic.field_validator('env')
@classmethod
def validate_env(cls, value: dict[str, str]) -> dict[str, str]:
return {str(k): str(v) for k, v in value.items()}
@pydantic.field_validator('cwd')
@classmethod
def validate_cwd(cls, value: str) -> str:
value = value.strip()
if not value.startswith('/'):
raise ValueError('cwd must be an absolute path inside the sandbox')
return value
class BoxManagedProcessInfo(pydantic.BaseModel):
session_id: str
status: BoxManagedProcessStatus
command: str
args: list[str]
cwd: str
env_keys: list[str]
attached: bool = False
started_at: dt.datetime
exited_at: dt.datetime | None = None
exit_code: int | None = None
stderr_preview: str = ''
class BoxExecutionResult(pydantic.BaseModel):
session_id: str
backend_name: str
status: BoxExecutionStatus
exit_code: int | None
stdout: str = ''
stderr: str = ''
duration_ms: int
@property
def ok(self) -> bool:
return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0

View File

@@ -1,386 +0,0 @@
from __future__ import annotations
import asyncio
import collections
import dataclasses
import datetime as dt
import logging
import uuid
from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend
from .errors import (
BoxBackendUnavailableError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
from .models import (
BoxExecutionResult,
BoxExecutionStatus,
BoxManagedProcessInfo,
BoxManagedProcessSpec,
BoxManagedProcessStatus,
BoxSessionInfo,
BoxSpec,
)
_UTC = dt.timezone.utc
_MANAGED_PROCESS_STDERR_PREVIEW_LIMIT = 4000
@dataclasses.dataclass(slots=True)
class _ManagedProcess:
spec: BoxManagedProcessSpec
process: asyncio.subprocess.Process
started_at: dt.datetime
attach_lock: asyncio.Lock
stderr_chunks: collections.deque[str]
stderr_total_len: int = 0
exit_code: int | None = None
exited_at: dt.datetime | None = None
@property
def is_running(self) -> bool:
return self.exit_code is None and self.process.returncode is None
@dataclasses.dataclass(slots=True)
class _RuntimeSession:
info: BoxSessionInfo
lock: asyncio.Lock
managed_process: _ManagedProcess | None = None
class BoxRuntime:
def __init__(
self,
logger: logging.Logger,
backends: list[BaseSandboxBackend] | None = None,
session_ttl_sec: int = 300,
):
self.logger = logger
self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)]
self.session_ttl_sec = session_ttl_sec
self._backend: BaseSandboxBackend | None = None
self._sessions: dict[str, _RuntimeSession] = {}
self._lock = asyncio.Lock()
self.instance_id = uuid.uuid4().hex[:12]
async def initialize(self):
self._backend = await self._select_backend()
if self._backend is not None:
self._backend.instance_id = self.instance_id
try:
await self._backend.cleanup_orphaned_containers(self.instance_id)
except Exception as exc:
self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}')
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
if not spec.cmd:
raise BoxValidationError('cmd must not be empty')
session = await self._get_or_create_session(spec)
async with session.lock:
self.logger.info(
'LangBot Box execute: '
f'session_id={spec.session_id} '
f'backend_session_id={session.info.backend_session_id} '
f'backend={session.info.backend_name} '
f'workdir={spec.workdir} '
f'timeout_sec={spec.timeout_sec}'
)
result = await (await self._get_backend()).exec(session.info, spec)
async with self._lock:
now = dt.datetime.now(_UTC)
if spec.session_id in self._sessions:
self._sessions[spec.session_id].info.last_used_at = now
if result.status == BoxExecutionStatus.TIMED_OUT:
await self._drop_session_locked(spec.session_id)
return result
async def shutdown(self):
async with self._lock:
session_ids = list(self._sessions.keys())
for session_id in session_ids:
await self._drop_session_locked(session_id)
async def create_session(self, spec: BoxSpec) -> dict:
session = await self._get_or_create_session(spec)
return self._session_to_dict(session.info)
async def delete_session(self, session_id: str) -> None:
async with self._lock:
if session_id not in self._sessions:
raise BoxSessionNotFoundError(f'session {session_id} not found')
await self._drop_session_locked(session_id)
async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> dict:
async with self._lock:
runtime_session = self._sessions.get(session_id)
if runtime_session is None:
raise BoxSessionNotFoundError(f'session {session_id} not found')
async with runtime_session.lock:
existing = runtime_session.managed_process
if existing is not None and existing.is_running:
raise BoxManagedProcessConflictError(f'session {session_id} already has a managed process')
backend = await self._get_backend()
process = await backend.start_managed_process(runtime_session.info, spec)
managed_process = _ManagedProcess(
spec=spec,
process=process,
started_at=dt.datetime.now(_UTC),
attach_lock=asyncio.Lock(),
stderr_chunks=collections.deque(),
)
runtime_session.managed_process = managed_process
runtime_session.info.last_used_at = dt.datetime.now(_UTC)
asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, managed_process))
asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, managed_process))
return self._managed_process_to_dict(runtime_session.info.session_id, managed_process)
def get_managed_process(self, session_id: str) -> dict:
runtime_session = self._sessions.get(session_id)
if runtime_session is None:
raise BoxSessionNotFoundError(f'session {session_id} not found')
if runtime_session.managed_process is None:
raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process')
return self._managed_process_to_dict(session_id, runtime_session.managed_process)
# ── Observability ─────────────────────────────────────────────────
async def get_backend_info(self) -> dict:
backend = self._backend
if backend is None:
return {'name': None, 'available': False}
try:
available = await backend.is_available()
except Exception:
available = False
return {'name': backend.name, 'available': available}
def get_sessions(self) -> list[dict]:
return [self._session_to_dict(s.info) for s in self._sessions.values()]
def get_session(self, session_id: str) -> dict:
runtime_session = self._sessions.get(session_id)
if runtime_session is None:
raise BoxSessionNotFoundError(f'session {session_id} not found')
result = self._session_to_dict(runtime_session.info)
if runtime_session.managed_process is not None:
result['managed_process'] = self._managed_process_to_dict(session_id, runtime_session.managed_process)
return result
async def get_status(self) -> dict:
backend_info = await self.get_backend_info()
return {
'backend': backend_info,
'active_sessions': len(self._sessions),
'managed_processes': sum(
1
for runtime_session in self._sessions.values()
if runtime_session.managed_process is not None and runtime_session.managed_process.is_running
),
'session_ttl_sec': self.session_ttl_sec,
}
async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession:
async with self._lock:
await self._reap_expired_sessions_locked()
existing = self._sessions.get(spec.session_id)
if existing is not None:
self._assert_session_compatible(existing.info, spec)
existing.info.last_used_at = dt.datetime.now(_UTC)
self.logger.info(
'LangBot Box session reused: '
f'session_id={spec.session_id} '
f'backend_session_id={existing.info.backend_session_id} '
f'backend={existing.info.backend_name}'
)
return existing
backend = await self._get_backend()
info = await backend.start_session(spec)
runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock())
self._sessions[spec.session_id] = runtime_session
self.logger.info(
'LangBot Box session created: '
f'session_id={spec.session_id} '
f'backend_session_id={info.backend_session_id} '
f'backend={info.backend_name} '
f'image={info.image} '
f'network={info.network.value} '
f'host_path={info.host_path} '
f'host_path_mode={info.host_path_mode.value}'
)
return runtime_session
async def _get_backend(self) -> BaseSandboxBackend:
if self._backend is None:
self._backend = await self._select_backend()
if self._backend is None:
raise BoxBackendUnavailableError(
'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.'
)
return self._backend
async def _select_backend(self) -> BaseSandboxBackend | None:
for backend in self.backends:
try:
await backend.initialize()
if await backend.is_available():
self.logger.info(f'LangBot Box using backend: {backend.name}')
return backend
except Exception as exc:
self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}')
self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready')
return None
async def _reap_expired_sessions_locked(self):
if self.session_ttl_sec <= 0:
return
deadline = dt.datetime.now(_UTC) - dt.timedelta(seconds=self.session_ttl_sec)
expired_session_ids = [
session_id
for session_id, session in self._sessions.items()
if session.info.last_used_at < deadline
and not (session.managed_process is not None and session.managed_process.is_running)
]
for session_id in expired_session_ids:
await self._drop_session_locked(session_id)
async def _drop_session_locked(self, session_id: str):
runtime_session = self._sessions.pop(session_id, None)
if runtime_session is None or self._backend is None:
return
await self._terminate_managed_process(runtime_session)
try:
self.logger.info(
'LangBot Box session cleanup: '
f'session_id={session_id} '
f'backend_session_id={runtime_session.info.backend_session_id} '
f'backend={runtime_session.info.backend_name}'
)
await self._backend.stop_session(runtime_session.info)
except Exception as exc:
self.logger.warning(f'Failed to clean up box session {session_id}: {exc}')
def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec):
_COMPAT_FIELDS = (
'network',
'image',
'host_path',
'host_path_mode',
'cpus',
'memory_mb',
'pids_limit',
'read_only_rootfs',
)
for field in _COMPAT_FIELDS:
session_val = getattr(session, field)
spec_val = getattr(spec, field)
if session_val != spec_val:
display = session_val.value if hasattr(session_val, 'value') else session_val
raise BoxSessionConflictError(
f'sandbox_exec session {spec.session_id} already exists with {field}={display}'
)
async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None:
stream = managed_process.process.stderr
if stream is None:
return
try:
while True:
chunk = await stream.readline()
if not chunk:
break
text = chunk.decode('utf-8', errors='replace').rstrip()
if not text:
continue
managed_process.stderr_chunks.append(text)
managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator
while (
managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT
and managed_process.stderr_chunks
):
removed = managed_process.stderr_chunks.popleft()
managed_process.stderr_total_len -= len(removed) + 1
self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}')
except Exception as exc:
self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}')
async def _watch_managed_process(self, session_id: str, managed_process: _ManagedProcess) -> None:
return_code = await managed_process.process.wait()
managed_process.exit_code = return_code
managed_process.exited_at = dt.datetime.now(_UTC)
runtime_session = self._sessions.get(session_id)
if runtime_session is not None:
runtime_session.info.last_used_at = managed_process.exited_at
self.logger.info(f'LangBot Box managed process exited: session_id={session_id} return_code={return_code}')
async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> None:
managed_process = runtime_session.managed_process
if managed_process is None or not managed_process.is_running:
return
process = managed_process.process
try:
if process.stdin is not None:
process.stdin.close()
except Exception:
pass
try:
await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5)
except asyncio.TimeoutError:
if process.returncode is None:
try:
process.terminate()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5)
except asyncio.TimeoutError:
if process.returncode is None:
try:
process.kill()
except ProcessLookupError:
pass
await process.wait()
finally:
managed_process.exit_code = process.returncode
managed_process.exited_at = dt.datetime.now(_UTC)
def _managed_process_to_dict(self, session_id: str, managed_process: _ManagedProcess) -> dict:
stderr_preview = '\n'.join(managed_process.stderr_chunks)
status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED
return BoxManagedProcessInfo(
session_id=session_id,
status=status,
command=managed_process.spec.command,
args=managed_process.spec.args,
cwd=managed_process.spec.cwd,
env_keys=sorted(managed_process.spec.env.keys()),
attached=managed_process.attach_lock.locked(),
started_at=managed_process.started_at,
exited_at=managed_process.exited_at,
exit_code=managed_process.exit_code,
stderr_preview=stderr_preview,
).model_dump(mode='json')
@staticmethod
def _session_to_dict(info: BoxSessionInfo) -> dict:
return info.model_dump(mode='json')

View File

@@ -1,35 +0,0 @@
from __future__ import annotations
import os
from .errors import BoxValidationError
from .models import BoxSpec
BLOCKED_HOST_PATHS = frozenset(
{
'/etc',
'/proc',
'/sys',
'/dev',
'/root',
'/boot',
'/run',
'/var/run',
'/run/docker.sock',
'/var/run/docker.sock',
'/run/podman',
'/var/run/podman',
}
)
def validate_sandbox_security(spec: BoxSpec) -> None:
"""Validate that a BoxSpec does not request dangerous container config.
Raises BoxValidationError when the spec contains a blocked host_path.
"""
if spec.host_path:
real = os.path.realpath(spec.host_path)
for blocked in BLOCKED_HOST_PATHS:
if real == blocked or real.startswith(blocked + '/'):
raise BoxValidationError(f'host_path {spec.host_path} is blocked for security')

View File

@@ -1,267 +0,0 @@
"""Standalone Box Runtime service exposing BoxRuntime via action RPC.
Usage (stdio, launched by LangBot as subprocess):
python -m langbot.pkg.box.server
Usage (ws + ws relay, for remote/docker mode):
python -m langbot.pkg.box.server --port 5410
"""
from __future__ import annotations
import argparse
import asyncio
import datetime as dt
import logging
import sys
from typing import Any
import pydantic
from aiohttp import web
from langbot_plugin.entities.io.actions.enums import CommonAction
from langbot_plugin.entities.io.resp import ActionResponse
from langbot_plugin.runtime.io.connection import Connection
from langbot_plugin.runtime.io.handler import Handler
from .actions import LangBotToBoxAction
from .errors import (
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionNotFoundError,
)
from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec
from .runtime import BoxRuntime
logger = logging.getLogger('langbot.box.server')
def _result_to_dict(result: BoxExecutionResult) -> dict:
return result.model_dump(mode='json')
class BoxServerHandler(Handler):
"""Server-side handler that registers box actions backed by BoxRuntime."""
name = 'BoxServerHandler'
def __init__(self, connection: Connection, runtime: BoxRuntime):
super().__init__(connection)
self._runtime = runtime
self._register_actions()
def _register_actions(self) -> None:
@self.action(CommonAction.PING)
async def ping(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success({})
@self.action(LangBotToBoxAction.HEALTH)
async def health(data: dict[str, Any]) -> ActionResponse:
info = await self._runtime.get_backend_info()
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.STATUS)
async def status(data: dict[str, Any]) -> ActionResponse:
result = await self._runtime.get_status()
return ActionResponse.success(result)
@self.action(LangBotToBoxAction.EXEC)
async def exec_cmd(data: dict[str, Any]) -> ActionResponse:
try:
spec = BoxSpec.model_validate(data)
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
result = await self._runtime.execute(spec)
return ActionResponse.success(_result_to_dict(result))
@self.action(LangBotToBoxAction.CREATE_SESSION)
async def create_session(data: dict[str, Any]) -> ActionResponse:
try:
spec = BoxSpec.model_validate(data)
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
info = await self._runtime.create_session(spec)
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.GET_SESSION)
async def get_session(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success(self._runtime.get_session(data['session_id']))
@self.action(LangBotToBoxAction.GET_SESSIONS)
async def get_sessions(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success({'sessions': self._runtime.get_sessions()})
@self.action(LangBotToBoxAction.DELETE_SESSION)
async def delete_session(data: dict[str, Any]) -> ActionResponse:
await self._runtime.delete_session(data['session_id'])
return ActionResponse.success({'deleted': data['session_id']})
@self.action(LangBotToBoxAction.START_MANAGED_PROCESS)
async def start_managed_process(data: dict[str, Any]) -> ActionResponse:
session_id = data['session_id']
try:
spec = BoxManagedProcessSpec.model_validate(data['spec'])
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
info = await self._runtime.start_managed_process(session_id, spec)
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.GET_MANAGED_PROCESS)
async def get_managed_process(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success(self._runtime.get_managed_process(data['session_id']))
@self.action(LangBotToBoxAction.GET_BACKEND_INFO)
async def get_backend_info(data: dict[str, Any]) -> ActionResponse:
info = await self._runtime.get_backend_info()
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.SHUTDOWN)
async def shutdown(data: dict[str, Any]) -> ActionResponse:
await self._runtime.shutdown()
return ActionResponse.success({})
# ── Managed process WebSocket relay (aiohttp) ────────────────────────
def _error_response(exc: Exception) -> web.Response:
return web.json_response(
{'error': {'code': type(exc).__name__, 'message': str(exc)}},
status=400,
)
async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
runtime_session = runtime._sessions.get(session_id)
if runtime_session is None:
return _error_response(BoxSessionNotFoundError(f'session {session_id} not found'))
managed_process = runtime_session.managed_process
if managed_process is None:
return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process'))
if not managed_process.is_running:
return _error_response(
BoxManagedProcessConflictError(f'managed process in session {session_id} is not running')
)
ws = web.WebSocketResponse(protocols=('mcp',))
await ws.prepare(request)
async with managed_process.attach_lock:
process = managed_process.process
stdout = process.stdout
stdin = process.stdin
if stdout is None or stdin is None:
await ws.close(message=b'managed process stdio unavailable')
return ws
async def _stdout_to_ws() -> None:
while True:
line = await stdout.readline()
if not line:
break
await ws.send_str(line.decode('utf-8', errors='replace').rstrip('\n'))
runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc)
async def _ws_to_stdin() -> None:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
stdin.write((msg.data + '\n').encode('utf-8'))
await stdin.drain()
runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc)
elif msg.type in (
web.WSMsgType.CLOSE,
web.WSMsgType.CLOSING,
web.WSMsgType.CLOSED,
web.WSMsgType.ERROR,
):
break
stdout_task = asyncio.create_task(_stdout_to_ws())
stdin_task = asyncio.create_task(_ws_to_stdin())
try:
done, pending = await asyncio.wait(
[stdout_task, stdin_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
for task in done:
task.result()
finally:
await ws.close()
return ws
def create_ws_relay_app(runtime: BoxRuntime) -> web.Application:
"""Create a minimal aiohttp app that only serves the managed-process ws relay."""
app = web.Application()
app['runtime'] = runtime
app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws)
return app
# ── Entry point ──────────────────────────────────────────────────────
async def _run_server(host: str, port: int, mode: str) -> None:
runtime = BoxRuntime(logger=logger)
await runtime.initialize()
# Start aiohttp for ws relay (non-fatal — managed process attach
# degrades gracefully if the port is unavailable).
runner: web.AppRunner | None = None
try:
ws_app = create_ws_relay_app(runtime)
runner = web.AppRunner(ws_app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
logger.info(f'Box ws relay listening on {host}:{port}')
except OSError as exc:
logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}')
logger.warning('Managed process WebSocket attach will be unavailable.')
async def new_connection_callback(connection: Connection) -> None:
handler = BoxServerHandler(connection, runtime)
await handler.run()
try:
if mode == 'stdio':
from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController
ctrl = StdioServerController()
await ctrl.run(new_connection_callback)
else:
from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController
# Action RPC uses port+1 to avoid conflict with ws relay
rpc_port = port + 1
logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}')
ctrl = WebSocketServerController(rpc_port)
await ctrl.run(new_connection_callback)
finally:
await runtime.shutdown()
if runner is not None:
await runner.cleanup()
def main() -> None:
parser = argparse.ArgumentParser(description='LangBot Box Runtime Service')
parser.add_argument('--host', default='0.0.0.0', help='Bind address')
parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)')
parser.add_argument(
'--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)'
)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
asyncio.run(_run_server(args.host, args.port, args.mode))
if __name__ == '__main__':
main()

View File

@@ -9,17 +9,16 @@ from typing import TYPE_CHECKING
import pydantic
from .client import BoxRuntimeClient
from .connector import BoxRuntimeConnector
from .errors import BoxError, BoxValidationError
from .models import (
from langbot_plugin.box.client import BoxRuntimeClient
from .connector import BoxRuntimeConnector, _get_box_config
from langbot_plugin.box.errors import BoxError, BoxValidationError
from langbot_plugin.box.models import (
BUILTIN_PROFILES,
BoxExecutionResult,
BoxManagedProcessInfo,
BoxManagedProcessSpec,
BoxProfile,
BoxSpec,
get_box_config,
)
_INT_ADAPTER = pydantic.TypeAdapter(int)
@@ -241,7 +240,7 @@ class BoxService:
}
def _load_allowed_host_mount_roots(self) -> list[str]:
configured_roots = get_box_config(self.ap).get('allowed_host_mount_roots', [])
configured_roots = _get_box_config(self.ap).get('allowed_host_mount_roots', [])
normalized_roots: list[str] = []
for root in configured_roots:
@@ -253,7 +252,7 @@ class BoxService:
return normalized_roots
def _load_default_host_workspace(self) -> str | None:
default_host_workspace = str(get_box_config(self.ap).get('default_host_workspace', '')).strip()
default_host_workspace = str(_get_box_config(self.ap).get('default_host_workspace', '')).strip()
if not default_host_workspace:
return None
return os.path.realpath(os.path.abspath(default_host_workspace))
@@ -302,7 +301,7 @@ class BoxService:
raise BoxValidationError(f'host_path is outside allowed_host_mount_roots: {allowed_roots}')
def _load_profile(self) -> BoxProfile:
profile_name = str(get_box_config(self.ap).get('profile', 'default')).strip() or 'default'
profile_name = str(_get_box_config(self.ap).get('profile', 'default')).strip() or 'default'
profile = BUILTIN_PROFILES.get(profile_name)
if profile is None:

View File

@@ -327,7 +327,7 @@ class RuntimeMCPSession:
async def _monitor_box_process_health(self):
"""Poll managed process status; return when process exits."""
from langbot.pkg.box.models import BoxManagedProcessStatus
from langbot_plugin.box.models import BoxManagedProcessStatus
session_id = self._build_box_session_id()
consecutive_errors = 0

View File

@@ -5,7 +5,7 @@ import json
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from langbot_plugin.api.entities.events import pipeline_query
from langbot.pkg.box.models import BoxNetworkMode
from langbot_plugin.box.models import BoxNetworkMode
from .. import loader
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'

View File

@@ -1,38 +0,0 @@
from __future__ import annotations
import pytest
from langbot.pkg.box.backend import CLISandboxBackend, _MAX_RAW_OUTPUT_BYTES
class TestClipCapturedBytes:
def test_within_limit_unchanged(self):
data = b'hello world'
result = CLISandboxBackend._clip_captured_bytes(data, total_size=len(data), limit=1024)
assert result == 'hello world'
def test_exceeding_limit_clips_and_appends_notice(self):
captured = b'A' * 100
total_size = 200
result = CLISandboxBackend._clip_captured_bytes(captured, total_size=total_size, limit=100)
assert result.startswith('A' * 100)
assert 'raw output clipped at 100 bytes' in result
assert '100 bytes discarded' in result
def test_exact_limit_not_clipped(self):
data = b'B' * 100
result = CLISandboxBackend._clip_captured_bytes(data, total_size=100, limit=100)
assert result == 'B' * 100
assert 'clipped' not in result
def test_default_limit_is_module_constant(self):
data = b'x' * 10
result = CLISandboxBackend._clip_captured_bytes(data, total_size=10)
assert result == 'x' * 10
assert _MAX_RAW_OUTPUT_BYTES == 1_048_576
def test_invalid_utf8_replaced(self):
data = b'ok\xff\xfetail'
result = CLISandboxBackend._clip_captured_bytes(data, total_size=len(data), limit=1024)
assert 'ok' in result
assert 'tail' in result

View File

@@ -5,9 +5,9 @@ from unittest.mock import AsyncMock, Mock, patch
import pytest
from langbot.pkg.box.client import ActionRPCBoxClient
from langbot_plugin.box.client import ActionRPCBoxClient
from langbot.pkg.box.connector import BoxRuntimeConnector
from langbot.pkg.box.errors import BoxRuntimeUnavailableError
from langbot_plugin.box.errors import BoxRuntimeUnavailableError
def make_app(logger: Mock, runtime_url: str = ''):
@@ -27,7 +27,6 @@ def make_app(logger: Mock, runtime_url: str = ''):
def patch_platform(monkeypatch: pytest.MonkeyPatch, value: str):
monkeypatch.setattr('langbot.pkg.box.client.platform.get_platform', lambda: value)
monkeypatch.setattr('langbot.pkg.box.connector.platform.get_platform', lambda: value)

View File

@@ -1,103 +0,0 @@
from __future__ import annotations
import asyncio
import datetime as dt
from unittest.mock import Mock
import pytest
from langbot.pkg.box.backend import BaseSandboxBackend
from langbot.pkg.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSessionInfo, BoxSpec
from langbot.pkg.box.runtime import BoxRuntime
_UTC = dt.timezone.utc
class FakeManagedProcessBackend(BaseSandboxBackend):
name = 'fake-managed'
def __init__(self, logger: Mock):
super().__init__(logger)
async def is_available(self) -> bool:
return True
async def start_session(self, spec: BoxSpec) -> BoxSessionInfo:
now = dt.datetime.now(_UTC)
return BoxSessionInfo(
session_id=spec.session_id,
backend_name=self.name,
backend_session_id=f'backend-{spec.session_id}',
image=spec.image,
network=spec.network,
host_path=spec.host_path,
host_path_mode=spec.host_path_mode,
cpus=spec.cpus,
memory_mb=spec.memory_mb,
pids_limit=spec.pids_limit,
read_only_rootfs=spec.read_only_rootfs,
created_at=now,
last_used_at=now,
)
async def exec(self, session: BoxSessionInfo, spec: BoxSpec):
raise NotImplementedError
async def stop_session(self, session: BoxSessionInfo):
return None
async def start_managed_process(self, session: BoxSessionInfo, spec: BoxManagedProcessSpec) -> asyncio.subprocess.Process:
return await asyncio.create_subprocess_exec(
'sh',
'-lc',
'cat',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
@pytest.mark.asyncio
async def test_runtime_start_managed_process_tracks_status():
logger = Mock()
runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=300)
await runtime.initialize()
session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'})
await runtime.create_session(session_spec)
process_info = await runtime.start_managed_process(
'mcp-session',
BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'),
)
assert process_info['session_id'] == 'mcp-session'
assert process_info['status'] == BoxManagedProcessStatus.RUNNING.value
assert process_info['command'] == 'python'
assert process_info['args'] == ['-m', 'demo']
queried = runtime.get_managed_process('mcp-session')
assert queried['status'] == BoxManagedProcessStatus.RUNNING.value
await runtime.shutdown()
@pytest.mark.asyncio
async def test_runtime_does_not_reap_session_with_running_managed_process():
logger = Mock()
runtime = BoxRuntime(logger=logger, backends=[FakeManagedProcessBackend(logger)], session_ttl_sec=1)
await runtime.initialize()
session_spec = BoxSpec.model_validate({'cmd': 'echo bootstrap', 'session_id': 'mcp-session'})
await runtime.create_session(session_spec)
await runtime.start_managed_process(
'mcp-session',
BoxManagedProcessSpec(command='python', args=['-m', 'demo'], cwd='/workspace'),
)
runtime._sessions['mcp-session'].info.last_used_at = dt.datetime.now(_UTC) - dt.timedelta(seconds=120)
await runtime._reap_expired_sessions_locked()
assert 'mcp-session' in runtime._sessions
await runtime.shutdown()

View File

@@ -1,59 +0,0 @@
from __future__ import annotations
import pytest
from langbot.pkg.box.errors import BoxValidationError
from langbot.pkg.box.models import BoxHostMountMode, BoxNetworkMode, BoxSpec
from langbot.pkg.box.security import BLOCKED_HOST_PATHS, validate_sandbox_security
def _make_spec(**overrides) -> BoxSpec:
defaults = {
'session_id': 'test-session',
'cmd': 'echo hi',
'image': 'python:3.11-slim',
}
defaults.update(overrides)
return BoxSpec(**defaults)
class TestValidateSandboxSecurity:
def test_no_host_path_passes(self):
spec = _make_spec(host_path=None)
validate_sandbox_security(spec) # should not raise
def test_safe_host_path_passes(self):
spec = _make_spec(host_path='/home/user/my-project')
validate_sandbox_security(spec) # should not raise
@pytest.mark.parametrize('blocked', [
'/etc',
'/proc',
'/sys',
'/dev',
'/root',
'/boot',
'/run',
'/var/run',
'/run/docker.sock',
'/var/run/docker.sock',
'/run/podman',
'/var/run/podman',
])
def test_blocked_paths_rejected(self, blocked):
spec = _make_spec(host_path=blocked)
with pytest.raises(BoxValidationError, match='blocked for security'):
validate_sandbox_security(spec)
def test_blocked_subpath_rejected(self):
spec = _make_spec(host_path='/etc/nginx')
with pytest.raises(BoxValidationError, match='blocked for security'):
validate_sandbox_security(spec)
def test_path_starting_with_blocked_prefix_but_different_dir_passes(self):
# /etcetera is NOT /etc
spec = _make_spec(host_path='/etcetera/data')
validate_sandbox_security(spec) # should not raise
def test_blocked_host_paths_is_frozenset(self):
assert isinstance(BLOCKED_HOST_PATHS, frozenset)

View File

@@ -10,10 +10,10 @@ import pytest
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.pkg.box.backend import BaseSandboxBackend
from langbot.pkg.box.client import BoxRuntimeClient, ActionRPCBoxClient
from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError
from langbot.pkg.box.models import (
from langbot_plugin.box.backend import BaseSandboxBackend
from langbot_plugin.box.client import BoxRuntimeClient, ActionRPCBoxClient
from langbot_plugin.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError
from langbot_plugin.box.models import (
BUILTIN_PROFILES,
BoxExecutionResult,
BoxExecutionStatus,
@@ -24,7 +24,7 @@ from langbot.pkg.box.models import (
BoxSessionInfo,
BoxSpec,
)
from langbot.pkg.box.runtime import BoxRuntime
from langbot_plugin.box.runtime import BoxRuntime
from langbot.pkg.box.service import BoxService
_UTC = dt.timezone.utc
@@ -803,7 +803,7 @@ def _make_queue_connection_pair():
async def _make_rpc_pair(runtime: BoxRuntime):
"""Create an in-process (ActionRPCBoxClient, server_task, client_task) connected via queues."""
from langbot.pkg.box.server import BoxServerHandler
from langbot_plugin.box.server import BoxServerHandler
from langbot_plugin.runtime.io.handler import Handler
client_conn, server_conn = _make_queue_connection_pair()

View File

@@ -93,8 +93,8 @@ def mcp_module():
class _BPS(str, _enum.Enum):
RUNNING = 'running'
EXITED = 'exited'
_save_and_stub('langbot.pkg.box', is_package=True)
_save_and_stub('langbot.pkg.box.models', {'BoxManagedProcessStatus': _BPS})
_save_and_stub('langbot_plugin.box', is_package=True)
_save_and_stub('langbot_plugin.box.models', {'BoxManagedProcessStatus': _BPS})
# Now load mcp.py via spec_from_file_location
mod_fqn = 'langbot.pkg.provider.tools.loaders.mcp'