From 14057d17222544d1efe895280ee62a795588dcf7 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sat, 21 Mar 2026 10:28:03 +0000 Subject: [PATCH] refactor: use rpc --- src/langbot/pkg/box/actions.py | 21 + src/langbot/pkg/box/client.py | 217 ++++----- src/langbot/pkg/box/connector.py | 186 +++++--- src/langbot/pkg/box/server.py | 316 +++++++------ src/langbot/pkg/box/service.py | 7 +- src/langbot/pkg/plugin/connector.py | 34 +- src/langbot/pkg/provider/tools/loaders/mcp.py | 35 +- src/langbot/pkg/utils/managed_runtime.py | 89 ++++ .../box/test_box_integration.py | 125 +++--- .../box/test_box_mcp_integration.py | 95 +++- tests/unit_tests/box/test_box_connector.py | 100 ++--- tests/unit_tests/box/test_box_service.py | 423 +++++------------- 12 files changed, 791 insertions(+), 857 deletions(-) create mode 100644 src/langbot/pkg/box/actions.py create mode 100644 src/langbot/pkg/utils/managed_runtime.py diff --git a/src/langbot/pkg/box/actions.py b/src/langbot/pkg/box/actions.py new file mode 100644 index 00000000..54ebb7b0 --- /dev/null +++ b/src/langbot/pkg/box/actions.py @@ -0,0 +1,21 @@ +"""Box-specific action types for the action RPC protocol.""" + +from __future__ import annotations + +from langbot_plugin.entities.io.actions.enums import ActionType + + +class LangBotToBoxAction(ActionType): + """Actions sent from LangBot to the Box runtime.""" + + HEALTH = "box_health" + STATUS = "box_status" + EXEC = "box_exec" + CREATE_SESSION = "box_create_session" + GET_SESSION = "box_get_session" + GET_SESSIONS = "box_get_sessions" + DELETE_SESSION = "box_delete_session" + START_MANAGED_PROCESS = "box_start_managed_process" + GET_MANAGED_PROCESS = "box_get_managed_process" + GET_BACKEND_INFO = "box_get_backend_info" + SHUTDOWN = "box_shutdown" diff --git a/src/langbot/pkg/box/client.py b/src/langbot/pkg/box/client.py index 03c0839f..964b451b 100644 --- a/src/langbot/pkg/box/client.py +++ b/src/langbot/pkg/box/client.py @@ -1,23 +1,15 @@ -"""BoxRuntimeClient abstraction for remote Box Runtime access.""" +"""BoxRuntimeClient abstraction for Box Runtime access.""" from __future__ import annotations import abc import logging -from typing import TYPE_CHECKING +from typing import Any, TYPE_CHECKING -import aiohttp +from langbot_plugin.runtime.io.handler import Handler -from .errors import ( - BoxBackendUnavailableError, - BoxError, - BoxManagedProcessConflictError, - BoxManagedProcessNotFoundError, - BoxRuntimeUnavailableError, - BoxSessionConflictError, - BoxSessionNotFoundError, - BoxValidationError, -) +from .actions import LangBotToBoxAction +from .errors import BoxError, BoxRuntimeUnavailableError from .models import ( BoxExecutionResult, BoxExecutionStatus, @@ -31,19 +23,9 @@ from ..utils import platform if TYPE_CHECKING: from ..core import app as core_app -_ERROR_CODE_MAP: dict[str, type[BoxError]] = { - 'validation_error': BoxValidationError, - 'session_not_found': BoxSessionNotFoundError, - 'session_conflict': BoxSessionConflictError, - 'managed_process_not_found': BoxManagedProcessNotFoundError, - 'managed_process_conflict': BoxManagedProcessConflictError, - 'backend_unavailable': BoxBackendUnavailableError, - 'runtime_unavailable': BoxRuntimeUnavailableError, - 'internal_error': BoxError, -} - -def resolve_box_runtime_url(ap: 'core_app.Application') -> str: +def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str: + """Derive the ws relay base URL used for managed-process attach.""" runtime_url = str(get_box_config(ap).get('runtime_url', '')).strip() if runtime_url: return runtime_url @@ -90,54 +72,64 @@ class BoxRuntimeClient(abc.ABC): async def get_session(self, session_id: str) -> dict: ... -class RemoteBoxRuntimeClient(BoxRuntimeClient): - """HTTP client that talks to a standalone Box Runtime service.""" +def _translate_action_error(exc: Exception) -> BoxError: + """Convert an ActionCallError message back into the appropriate BoxError subclass.""" + from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, + ) + msg = str(exc) + _ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [ + ('BoxValidationError:', BoxValidationError), + ('BoxSessionNotFoundError:', BoxSessionNotFoundError), + ('BoxSessionConflictError:', BoxSessionConflictError), + ('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError), + ('BoxManagedProcessConflictError:', BoxManagedProcessConflictError), + ('BoxBackendUnavailableError:', BoxBackendUnavailableError), + ] + for prefix, cls in _ERROR_PREFIX_MAP: + if prefix in msg: + return cls(msg) + return BoxError(msg) - def __init__(self, base_url: str, logger: logging.Logger): - self._base_url = base_url.rstrip('/') + +class ActionRPCBoxClient(BoxRuntimeClient): + """Client that talks to BoxRuntime via the action RPC protocol.""" + + def __init__(self, logger: logging.Logger): self._logger = logger - self._session: aiohttp.ClientSession | None = None + self._handler: Handler | None = None - def _get_session(self) -> aiohttp.ClientSession: - if self._session is None or self._session.closed: - self._session = aiohttp.ClientSession() - return self._session + @property + def handler(self) -> Handler: + if self._handler is None: + raise BoxRuntimeUnavailableError('box runtime not connected') + return self._handler - async def _check_response(self, resp: aiohttp.ClientResponse) -> None: - if resp.status < 400: - return + def set_handler(self, handler: Handler) -> None: + self._handler = handler + + async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]: try: - body = await resp.json() - error_info = body.get('error', {}) - code = error_info.get('code', '') - message = error_info.get('message', '') - except Exception: - resp.raise_for_status() - return - exc_class = _ERROR_CODE_MAP.get(code, BoxError) - raise exc_class(message) + return await self.handler.call_action(action, data, timeout=timeout) + except BoxRuntimeUnavailableError: + raise + except Exception as exc: + raise _translate_action_error(exc) from exc async def initialize(self) -> None: - session = self._get_session() try: - async with session.get(f'{self._base_url}/v1/health') as resp: - await self._check_response(resp) - self._logger.info(f'LangBot Box runtime connected: {self._base_url}') - except aiohttp.ClientError as exc: + await self._call(LangBotToBoxAction.HEALTH, {}) + self._logger.info('LangBot Box runtime connected via action RPC.') + except Exception as exc: raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc async def execute(self, spec: BoxSpec) -> BoxExecutionResult: - session = self._get_session() - payload = spec.model_dump(mode='json') - try: - async with session.post( - f'{self._base_url}/v1/sessions/{spec.session_id}/exec', - json=payload, - ) as resp: - await self._check_response(resp) - data = await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0) return BoxExecutionResult( session_id=data['session_id'], backend_name=data['backend_name'], @@ -149,103 +141,52 @@ class RemoteBoxRuntimeClient(BoxRuntimeClient): ) async def shutdown(self) -> None: - if self._session and not self._session.closed: - await self._session.close() - self._session = None + if self._handler is not None: + try: + await self._call(LangBotToBoxAction.SHUTDOWN, {}) + except Exception: + pass + self._handler = None async def get_status(self) -> dict: - session = self._get_session() - try: - async with session.get(f'{self._base_url}/v1/status') as resp: - await self._check_response(resp) - return await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return await self._call(LangBotToBoxAction.STATUS, {}) async def get_sessions(self) -> list[dict]: - session = self._get_session() - try: - async with session.get(f'{self._base_url}/v1/sessions') as resp: - await self._check_response(resp) - return await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + data = await self._call(LangBotToBoxAction.GET_SESSIONS, {}) + return data['sessions'] async def get_session(self, session_id: str) -> dict: - session = self._get_session() - try: - async with session.get(f'{self._base_url}/v1/sessions/{session_id}') as resp: - await self._check_response(resp) - return await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id}) async def get_backend_info(self) -> dict: - session = self._get_session() - try: - async with session.get(f'{self._base_url}/v1/health') as resp: - await self._check_response(resp) - return await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {}) async def delete_session(self, session_id: str) -> None: - session = self._get_session() - try: - async with session.delete( - f'{self._base_url}/v1/sessions/{session_id}', - ) as resp: - await self._check_response(resp) - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}) async def create_session(self, spec: BoxSpec) -> dict: - session = self._get_session() - payload = spec.model_dump(mode='json') - try: - async with session.post( - f'{self._base_url}/v1/sessions/{spec.session_id}', - json=payload, - ) as resp: - await self._check_response(resp) - return await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json')) async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: - session = self._get_session() - payload = spec.model_dump(mode='json') - try: - async with session.post( - f'{self._base_url}/v1/sessions/{session_id}/managed-process', - json=payload, - ) as resp: - await self._check_response(resp) - data = await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + data = await self._call( + LangBotToBoxAction.START_MANAGED_PROCESS, + {'session_id': session_id, 'spec': spec.model_dump(mode='json')}, + ) return BoxManagedProcessInfo.model_validate(data) async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: - session = self._get_session() - try: - async with session.get( - f'{self._base_url}/v1/sessions/{session_id}/managed-process', - ) as resp: - await self._check_response(resp) - data = await resp.json() - except aiohttp.ClientError as exc: - raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id}) return BoxManagedProcessInfo.model_validate(data) - def get_managed_process_websocket_url(self, session_id: str) -> str: - if self._base_url.startswith('https://'): + def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str: + base = ws_relay_base_url + if base.startswith('https://'): scheme = 'wss://' - suffix = self._base_url[len('https://'):] - elif self._base_url.startswith('http://'): + suffix = base[len('https://'):] + elif base.startswith('http://'): scheme = 'ws://' - suffix = self._base_url[len('http://'):] + suffix = base[len('http://'):] else: scheme = 'ws://' - suffix = self._base_url + suffix = base return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws' diff --git a/src/langbot/pkg/box/connector.py b/src/langbot/pkg/box/connector.py index e1b83f9e..5c39353b 100644 --- a/src/langbot/pkg/box/connector.py +++ b/src/langbot/pkg/box/connector.py @@ -5,8 +5,12 @@ import os import sys from typing import TYPE_CHECKING +from langbot_plugin.entities.io.actions.enums import CommonAction +from langbot_plugin.runtime.io.handler import Handler +from langbot_plugin.runtime.io.connection import Connection + +from .client import ActionRPCBoxClient, resolve_box_ws_relay_url from .errors import BoxRuntimeUnavailableError -from .client import RemoteBoxRuntimeClient, resolve_box_runtime_url from .models import get_box_config from ..utils import platform @@ -15,44 +19,129 @@ if TYPE_CHECKING: class BoxRuntimeConnector: - """Build and initialize the Box runtime-facing service for the app.""" - - _HEALTH_CHECK_RETRY_COUNT = 40 - _HEALTH_CHECK_RETRY_INTERVAL_SEC = 0.25 + """Connect to the Box runtime via action RPC (stdio or ws).""" def __init__(self, ap: 'core_app.Application'): self.ap = ap self.configured_runtime_url = self._load_configured_runtime_url() - self.runtime_url = self.configured_runtime_url or resolve_box_runtime_url(ap) self.manages_local_runtime = self._should_manage_local_runtime() - self.client = RemoteBoxRuntimeClient(base_url=self.runtime_url, logger=ap.logger) - self.runtime_subprocess: asyncio.subprocess.Process | None = None - self.runtime_subprocess_task: asyncio.Task | None = None + self.ws_relay_base_url = resolve_box_ws_relay_url(ap) + self.client = ActionRPCBoxClient(logger=ap.logger) + + self._handler: Handler | None = None + self._handler_task: asyncio.Task | None = None + self._ctrl_task: asyncio.Task | None = None + self._subprocess: asyncio.subprocess.Process | None = None + self._subprocess_wait_task: asyncio.Task | None = None async def initialize(self) -> None: - if not self.manages_local_runtime: - await self.client.initialize() - return + if self.manages_local_runtime: + await self._start_local_stdio() + else: + await self._connect_remote_ws() + + async def _start_local_stdio(self) -> None: + """Launch box server as subprocess and connect via stdio.""" + from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController + + python_path = sys.executable + env = os.environ.copy() + + connected = asyncio.Event() + connect_error: list[Exception] = [] + + async def new_connection_callback(connection: Connection) -> None: + handler = Handler.__new__(Handler) + Handler.__init__(handler, connection) + self._handler = handler + self.client.set_handler(handler) + self._handler_task = asyncio.create_task(handler.run()) + try: + await handler.call_action(CommonAction.PING, {}) + self.ap.logger.info('Connected to Box runtime via stdio.') + connected.set() + await self._handler_task + except Exception as exc: + if not connected.is_set(): + connect_error.append(exc) + connected.set() + + ctrl = StdioClientController( + command=python_path, + args=['-m', 'langbot.pkg.box.server', '--port', str(self._get_ws_relay_port())], + env=env, + ) + self._subprocess = None # StdioClientController manages the subprocess + self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback)) + + # Wait for connection or failure + try: + await asyncio.wait_for(connected.wait(), timeout=30.0) + except asyncio.TimeoutError: + raise BoxRuntimeUnavailableError('box runtime subprocess did not connect in time') + + if connect_error: + raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}') + + # Store subprocess reference for dispose + self._subprocess = ctrl.process + + async def _connect_remote_ws(self) -> None: + """Connect to a remote box server via WebSocket.""" + from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController + + ws_url = self._get_rpc_ws_url() + + connected = asyncio.Event() + connect_error: list[Exception] = [] + + async def new_connection_callback(connection: Connection) -> None: + handler = Handler.__new__(Handler) + Handler.__init__(handler, connection) + self._handler = handler + self.client.set_handler(handler) + self._handler_task = asyncio.create_task(handler.run()) + try: + await handler.call_action(CommonAction.PING, {}) + self.ap.logger.info('Connected to Box runtime via WebSocket.') + connected.set() + await self._handler_task + except Exception as exc: + if not connected.is_set(): + connect_error.append(exc) + connected.set() + + async def on_connect_failed(ctrl, exc): + connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed')) + connected.set() + + ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed) + self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback)) try: - await self.client.initialize() - return - except BoxRuntimeUnavailableError: - self.ap.logger.info( - 'Local Box runtime is not running, starting an embedded Box runtime server...' - ) + await asyncio.wait_for(connected.wait(), timeout=30.0) + except asyncio.TimeoutError: + raise BoxRuntimeUnavailableError('box runtime ws connection timed out') - await self._start_local_runtime_process() - await self._wait_until_runtime_ready() + if connect_error: + raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}') def dispose(self) -> None: - if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: - self.ap.logger.info('Terminating local Box runtime process...') - self.runtime_subprocess.terminate() + if self._handler_task is not None: + self._handler_task.cancel() + self._handler_task = None - if self.runtime_subprocess_task is not None: - self.runtime_subprocess_task.cancel() - self.runtime_subprocess_task = None + if self._ctrl_task is not None: + self._ctrl_task.cancel() + self._ctrl_task = None + + if self._subprocess is not None and self._subprocess.returncode is None: + self.ap.logger.info('Terminating managed box runtime process...') + self._subprocess.terminate() + + if self._subprocess_wait_task is not None: + self._subprocess_wait_task.cancel() + self._subprocess_wait_task = None def _load_configured_runtime_url(self) -> str: return str(get_box_config(self.ap).get('runtime_url', '')).strip() @@ -60,36 +149,19 @@ class BoxRuntimeConnector: def _should_manage_local_runtime(self) -> bool: return not self.configured_runtime_url and platform.get_platform() != 'docker' - async def _start_local_runtime_process(self) -> None: - if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: - return + def _get_ws_relay_port(self) -> int: + """Extract the port for ws relay from ws_relay_base_url.""" + from urllib.parse import urlparse + parsed = urlparse(self.ws_relay_base_url) + return parsed.port or 5410 - python_path = sys.executable - env = os.environ.copy() - self.runtime_subprocess = await asyncio.create_subprocess_exec( - python_path, - '-m', - 'langbot.pkg.box.server', - env=env, - ) - self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait()) + def _get_rpc_ws_url(self) -> str: + """Derive the action RPC ws URL from the configured runtime URL. - async def _wait_until_runtime_ready(self) -> None: - last_exc: BoxRuntimeUnavailableError | None = None - for _ in range(self._HEALTH_CHECK_RETRY_COUNT): - if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None: - raise BoxRuntimeUnavailableError( - f'local box runtime exited before becoming ready (code {self.runtime_subprocess.returncode})' - ) - - try: - await self.client.initialize() - self.ap.logger.info(f'Local Box runtime is ready at {self.runtime_url}.') - return - except BoxRuntimeUnavailableError as exc: - last_exc = exc - await asyncio.sleep(self._HEALTH_CHECK_RETRY_INTERVAL_SEC) - - if last_exc is not None: - raise last_exc - raise BoxRuntimeUnavailableError('local box runtime did not become ready') + The RPC endpoint is on port+1 relative to the ws relay port. + """ + from urllib.parse import urlparse + parsed = urlparse(self.ws_relay_base_url) + host = parsed.hostname or '127.0.0.1' + port = (parsed.port or 5410) + 1 + return f'ws://{host}:{port}' diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py index 070417c9..c056695f 100644 --- a/src/langbot/pkg/box/server.py +++ b/src/langbot/pkg/box/server.py @@ -1,7 +1,10 @@ -"""Standalone HTTP service exposing BoxRuntime as a REST API. +"""Standalone Box Runtime service exposing BoxRuntime via action RPC. -Usage: - python -m langbot.pkg.box.server [--host 0.0.0.0] [--port 5410] +Usage (stdio, launched by LangBot as subprocess): + python -m langbot.pkg.box.server + +Usage (ws + ws relay, for remote/docker mode): + python -m langbot.pkg.box.server --port 5410 """ from __future__ import annotations @@ -10,46 +13,29 @@ import argparse import asyncio import datetime as dt import logging +import sys +from typing import Any import pydantic from aiohttp import web +from langbot_plugin.entities.io.actions.enums import CommonAction +from langbot_plugin.entities.io.resp import ActionResponse +from langbot_plugin.runtime.io.connection import Connection +from langbot_plugin.runtime.io.handler import Handler + +from .actions import LangBotToBoxAction from .errors import ( - BoxBackendUnavailableError, BoxError, BoxManagedProcessConflictError, BoxManagedProcessNotFoundError, - BoxSessionConflictError, BoxSessionNotFoundError, - BoxValidationError, ) from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec from .runtime import BoxRuntime logger = logging.getLogger('langbot.box.server') -_ERROR_MAP: dict[type, tuple[int, str]] = { - BoxValidationError: (400, 'validation_error'), - BoxSessionNotFoundError: (404, 'session_not_found'), - BoxSessionConflictError: (409, 'session_conflict'), - BoxManagedProcessNotFoundError: (404, 'managed_process_not_found'), - BoxManagedProcessConflictError: (409, 'managed_process_conflict'), - BoxBackendUnavailableError: (503, 'backend_unavailable'), -} - - -def _error_response(exc: Exception) -> web.Response: - for exc_type, (status, code) in _ERROR_MAP.items(): - if isinstance(exc, exc_type): - return web.json_response( - {'error': {'code': code, 'message': str(exc)}}, - status=status, - ) - return web.json_response( - {'error': {'code': 'internal_error', 'message': str(exc)}}, - status=500, - ) - def _result_to_dict(result: BoxExecutionResult) -> dict: return { @@ -63,111 +49,98 @@ def _result_to_dict(result: BoxExecutionResult) -> dict: } -async def handle_exec(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - try: - body = await request.json() - session_id = request.match_info['session_id'] - body['session_id'] = session_id - spec = BoxSpec.model_validate(body) - result = await runtime.execute(spec) - return web.json_response(_result_to_dict(result)) - except pydantic.ValidationError as exc: - return web.json_response( - {'error': {'code': 'validation_error', 'message': str(exc)}}, - status=400, - ) - except BoxError as exc: - return _error_response(exc) +class BoxServerHandler(Handler): + """Server-side handler that registers box actions backed by BoxRuntime.""" + + name = 'BoxServerHandler' + + def __init__(self, connection: Connection, runtime: BoxRuntime): + super().__init__(connection) + self._runtime = runtime + self._register_actions() + + def _register_actions(self) -> None: + + @self.action(CommonAction.PING) + async def ping(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({}) + + @self.action(LangBotToBoxAction.HEALTH) + async def health(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.STATUS) + async def status(data: dict[str, Any]) -> ActionResponse: + result = await self._runtime.get_status() + return ActionResponse.success(result) + + @self.action(LangBotToBoxAction.EXEC) + async def exec_cmd(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + result = await self._runtime.execute(spec) + return ActionResponse.success(_result_to_dict(result)) + + @self.action(LangBotToBoxAction.CREATE_SESSION) + async def create_session(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.create_session(spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_SESSION) + async def get_session(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success(self._runtime.get_session(data['session_id'])) + + @self.action(LangBotToBoxAction.GET_SESSIONS) + async def get_sessions(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({'sessions': self._runtime.get_sessions()}) + + @self.action(LangBotToBoxAction.DELETE_SESSION) + async def delete_session(data: dict[str, Any]) -> ActionResponse: + await self._runtime.delete_session(data['session_id']) + return ActionResponse.success({'deleted': data['session_id']}) + + @self.action(LangBotToBoxAction.START_MANAGED_PROCESS) + async def start_managed_process(data: dict[str, Any]) -> ActionResponse: + session_id = data['session_id'] + try: + spec = BoxManagedProcessSpec.model_validate(data['spec']) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.start_managed_process(session_id, spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_MANAGED_PROCESS) + async def get_managed_process(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success( + self._runtime.get_managed_process(data['session_id']) + ) + + @self.action(LangBotToBoxAction.GET_BACKEND_INFO) + async def get_backend_info(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.SHUTDOWN) + async def shutdown(data: dict[str, Any]) -> ActionResponse: + await self._runtime.shutdown() + return ActionResponse.success({}) -async def handle_create_session(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - try: - body = await request.json() - session_id = request.match_info['session_id'] - body['session_id'] = session_id - spec = BoxSpec.model_validate(body) - session_info = await runtime.create_session(spec) - return web.json_response(session_info, status=201) - except pydantic.ValidationError as exc: - return web.json_response( - {'error': {'code': 'validation_error', 'message': str(exc)}}, - status=400, - ) - except BoxError as exc: - return _error_response(exc) +# ── Managed process WebSocket relay (aiohttp) ──────────────────────── -async def handle_get_sessions(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - try: - return web.json_response(runtime.get_sessions()) - except BoxError as exc: - return _error_response(exc) - - -async def handle_delete_session(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - session_id = request.match_info['session_id'] - try: - await runtime.delete_session(session_id) - return web.json_response({'deleted': session_id}) - except BoxError as exc: - return _error_response(exc) - - -async def handle_get_session(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - session_id = request.match_info['session_id'] - try: - return web.json_response(runtime.get_session(session_id)) - except BoxError as exc: - return _error_response(exc) - - -async def handle_status(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - try: - status = await runtime.get_status() - return web.json_response(status) - except BoxError as exc: - return _error_response(exc) - - -async def handle_health(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - try: - info = await runtime.get_backend_info() - return web.json_response(info) - except BoxError as exc: - return _error_response(exc) - - -async def handle_start_managed_process(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - session_id = request.match_info['session_id'] - try: - body = await request.json() - spec = BoxManagedProcessSpec.model_validate(body) - process_info = await runtime.start_managed_process(session_id, spec) - return web.json_response(process_info, status=201) - except pydantic.ValidationError as exc: - return web.json_response( - {'error': {'code': 'validation_error', 'message': str(exc)}}, - status=400, - ) - except BoxError as exc: - return _error_response(exc) - - -async def handle_get_managed_process(request: web.Request) -> web.Response: - runtime: BoxRuntime = request.app['runtime'] - session_id = request.match_info['session_id'] - try: - return web.json_response(runtime.get_managed_process(session_id)) - except BoxError as exc: - return _error_response(exc) +def _error_response(exc: Exception) -> web.Response: + return web.json_response( + {'error': {'code': type(exc).__name__, 'message': str(exc)}}, + status=400, + ) async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: @@ -229,50 +202,67 @@ async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: return ws -def create_app(runtime: BoxRuntime | None = None) -> web.Application: - """Create the aiohttp Application with all routes. - - If *runtime* is ``None`` a new ``BoxRuntime`` is created using the module - logger. - """ - if runtime is None: - runtime = BoxRuntime(logger=logger) - +def create_ws_relay_app(runtime: BoxRuntime) -> web.Application: + """Create a minimal aiohttp app that only serves the managed-process ws relay.""" app = web.Application() app['runtime'] = runtime - - app.router.add_post('/v1/sessions/{session_id}/exec', handle_exec) - app.router.add_post('/v1/sessions/{session_id}', handle_create_session) - app.router.add_get('/v1/sessions/{session_id}', handle_get_session) - app.router.add_get('/v1/sessions', handle_get_sessions) - app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session) - app.router.add_post('/v1/sessions/{session_id}/managed-process', handle_start_managed_process) - app.router.add_get('/v1/sessions/{session_id}/managed-process', handle_get_managed_process) app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) - app.router.add_get('/v1/status', handle_status) - app.router.add_get('/v1/health', handle_health) - - async def on_startup(_app: web.Application) -> None: - await _app['runtime'].initialize() - - async def on_shutdown(_app: web.Application) -> None: - await _app['runtime'].shutdown() - - app.on_startup.append(on_startup) - app.on_shutdown.append(on_shutdown) - return app +# ── Entry point ────────────────────────────────────────────────────── + + +async def _run_server(host: str, port: int, mode: str) -> None: + runtime = BoxRuntime(logger=logger) + await runtime.initialize() + + # Start aiohttp for ws relay (non-fatal — managed process attach + # degrades gracefully if the port is unavailable). + runner: web.AppRunner | None = None + try: + ws_app = create_ws_relay_app(runtime) + runner = web.AppRunner(ws_app) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + logger.info(f'Box ws relay listening on {host}:{port}') + except OSError as exc: + logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}') + logger.warning('Managed process WebSocket attach will be unavailable.') + + async def new_connection_callback(connection: Connection) -> None: + handler = BoxServerHandler(connection, runtime) + await handler.run() + + try: + if mode == 'stdio': + from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController + ctrl = StdioServerController() + await ctrl.run(new_connection_callback) + else: + from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController + # Action RPC uses port+1 to avoid conflict with ws relay + rpc_port = port + 1 + logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}') + ctrl = WebSocketServerController(rpc_port) + await ctrl.run(new_connection_callback) + finally: + await runtime.shutdown() + if runner is not None: + await runner.cleanup() + + def main() -> None: - parser = argparse.ArgumentParser(description='LangBot Box Runtime HTTP Service') + parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') parser.add_argument('--host', default='0.0.0.0', help='Bind address') - parser.add_argument('--port', type=int, default=5410, help='Bind port') + parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)') + parser.add_argument('--mode', choices=['stdio', 'ws'], default='stdio', + help='Control channel transport (default: stdio)') args = parser.parse_args() - logging.basicConfig(level=logging.INFO) - app = create_app() - web.run_app(app, host=args.host, port=args.port) + logging.basicConfig(level=logging.INFO, stream=sys.stderr) + asyncio.run(_run_server(args.host, args.port, args.mode)) if __name__ == '__main__': diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 26bb72a7..32c87292 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -147,7 +147,12 @@ class BoxService: getter = getattr(self.client, 'get_managed_process_websocket_url', None) if getter is None: raise BoxValidationError('box runtime client does not support managed process websocket attach') - return getter(session_id) + ws_relay_base_url = ( + self._runtime_connector.ws_relay_base_url + if self._runtime_connector is not None + else 'http://127.0.0.1:5410' + ) + return getter(session_id, ws_relay_base_url) def _serialize_result(self, result: BoxExecutionResult) -> dict: stdout, stdout_truncated = self._truncate(result.stdout) diff --git a/src/langbot/pkg/plugin/connector.py b/src/langbot/pkg/plugin/connector.py index 5d3236e0..a02037ec 100644 --- a/src/langbot/pkg/plugin/connector.py +++ b/src/langbot/pkg/plugin/connector.py @@ -17,6 +17,7 @@ from langbot_plugin.api.entities.builtin.pipeline.query import provider_session from ..core import app from . import handler from ..utils import platform +from ..utils.managed_runtime import ManagedRuntimeConnector from langbot_plugin.runtime.io.controllers.stdio import ( client as stdio_client_controller, ) @@ -34,11 +35,9 @@ from ..core import taskmgr from ..entity.persistence import plugin as persistence_plugin -class PluginRuntimeConnector: +class PluginRuntimeConnector(ManagedRuntimeConnector): """Plugin runtime connector""" - ap: app.Application - handler: handler.RuntimeConnectionHandler handler_task: asyncio.Task @@ -49,10 +48,6 @@ class PluginRuntimeConnector: ctrl: stdio_client_controller.StdioClientController | ws_client_controller.WebSocketClientController - runtime_subprocess_on_windows: asyncio.subprocess.Process | None = None - - runtime_subprocess_on_windows_task: asyncio.Task | None = None - runtime_disconnect_callback: typing.Callable[ [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] ] @@ -67,7 +62,7 @@ class PluginRuntimeConnector: [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] ], ): - self.ap = ap + super().__init__(ap) self.runtime_disconnect_callback = runtime_disconnect_callback self.is_enable_plugin = self.ap.instance_config.data.get('plugin', {}).get('enable', True) @@ -135,19 +130,7 @@ class PluginRuntimeConnector: # We have to launch runtime via cmd but communicate via ws. self.ap.logger.info('(windows) use cmd to launch plugin runtime and communicate via ws') - if self.runtime_subprocess_on_windows is None: # only launch once - python_path = sys.executable - env = os.environ.copy() - self.runtime_subprocess_on_windows = await asyncio.create_subprocess_exec( - python_path, - '-m', - 'langbot_plugin.cli.__init__', - 'rt', - env=env, - ) - - # hold the process - self.runtime_subprocess_on_windows_task = asyncio.create_task(self.runtime_subprocess_on_windows.wait()) + await self._start_runtime_subprocess('-m', 'langbot_plugin.cli.__init__', 'rt') ws_url = 'ws://localhost:5400/control/ws' @@ -523,13 +506,14 @@ class PluginRuntimeConnector: return await self.handler.retrieve_knowledge(plugin_author, plugin_name, retriever_name, retrieval_context) def dispose(self): - # No need to consider the shutdown on Windows - # for Windows can kill processes and subprocesses chainly - - if self.is_enable_plugin and isinstance(self.ctrl, stdio_client_controller.StdioClientController): + # On non-Windows stdio mode, terminate via the controller's process handle. + # On Windows, the managed subprocess is cleaned up by the base class. + if self.is_enable_plugin and hasattr(self, 'ctrl') and isinstance(self.ctrl, stdio_client_controller.StdioClientController): self.ap.logger.info('Terminating plugin runtime process...') self.ctrl.process.terminate() + self._dispose_subprocess() + if self.heartbeat_task is not None: self.heartbeat_task.cancel() self.heartbeat_task = None diff --git a/src/langbot/pkg/provider/tools/loaders/mcp.py b/src/langbot/pkg/provider/tools/loaders/mcp.py index c239ded3..e58a6c90 100644 --- a/src/langbot/pkg/provider/tools/loaders/mcp.py +++ b/src/langbot/pkg/provider/tools/loaders/mcp.py @@ -328,11 +328,15 @@ class RuntimeMCPSession: self.error_phase = None await asyncio.sleep(delay) + _MONITOR_POLL_INTERVAL = 5 + _MONITOR_MAX_CONSECUTIVE_ERRORS = 3 + async def _monitor_box_process_health(self): """Poll managed process status; return when process exits.""" from ...box.models import BoxManagedProcessStatus session_id = self._build_box_session_id() + consecutive_errors = 0 while not self._shutdown_event.is_set(): try: info = await self.ap.box_service.client.get_managed_process(session_id) @@ -341,10 +345,21 @@ class RuntimeMCPSession: else: status = getattr(info, 'status', '') if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED: + self.ap.logger.info( + f'MCP monitor for {self.server_name}: process exited' + ) return - except Exception: - return # Process or session gone - await asyncio.sleep(5) + consecutive_errors = 0 + except Exception as exc: + consecutive_errors += 1 + self.ap.logger.warning( + f'MCP monitor for {self.server_name}: get_managed_process failed ' + f'({consecutive_errors}/{self._MONITOR_MAX_CONSECUTIVE_ERRORS}): ' + f'{type(exc).__name__}: {exc}' + ) + if consecutive_errors >= self._MONITOR_MAX_CONSECUTIVE_ERRORS: + return + await asyncio.sleep(self._MONITOR_POLL_INTERVAL) async def start(self): if not self.enable: @@ -541,10 +556,18 @@ class RuntimeMCPSession: because /workspace may be mounted read-only and pip needs to write build artifacts in the source tree. """ + # Use /opt instead of /tmp — /tmp is often a small tmpfs (64 MB) + # and cannot hold the copied source tree plus pip build artifacts. _COPY_AND_INSTALL = ( - 'cp -r /workspace /tmp/_mcp_src' - ' && pip install --no-cache-dir /tmp/_mcp_src' - ' && rm -rf /tmp/_mcp_src' + 'mkdir -p /opt/_mcp_src' + ' && tar -C /workspace' + ' --exclude=.venv --exclude=.git --exclude=__pycache__' + ' --exclude=node_modules --exclude=.tox --exclude=.nox' + ' --exclude="*.egg-info" --exclude=.uv-cache' + ' -cf - .' + ' | tar -C /opt/_mcp_src -xf -' + ' && pip install --no-cache-dir /opt/_mcp_src' + ' && rm -rf /opt/_mcp_src' ) _INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt' diff --git a/src/langbot/pkg/utils/managed_runtime.py b/src/langbot/pkg/utils/managed_runtime.py new file mode 100644 index 00000000..50f90df3 --- /dev/null +++ b/src/langbot/pkg/utils/managed_runtime.py @@ -0,0 +1,89 @@ +"""Base class for connectors that may manage a local runtime subprocess.""" + +from __future__ import annotations + +import asyncio +import os +import sys +from typing import TYPE_CHECKING, Awaitable, Callable + +if TYPE_CHECKING: + from ..core import app as core_app + + +class ManagedRuntimeConnector: + """Base class for connectors that may manage a local runtime subprocess. + + Provides shared lifecycle helpers: subprocess launch, health-check retry, + and graceful termination. Concrete connectors (plugin, box, …) inherit + this and add their own protocol-specific logic. + """ + + ap: 'core_app.Application' + runtime_subprocess: asyncio.subprocess.Process | None + runtime_subprocess_task: asyncio.Task | None + + def __init__(self, ap: 'core_app.Application'): + self.ap = ap + self.runtime_subprocess = None + self.runtime_subprocess_task = None + + async def _start_runtime_subprocess(self, *args: str) -> None: + """Launch a local runtime as a subprocess of the current Python interpreter. + + If a subprocess is already running (no *returncode* yet), this is a no-op. + """ + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: + return + + python_path = sys.executable + env = os.environ.copy() + self.runtime_subprocess = await asyncio.create_subprocess_exec( + python_path, + *args, + env=env, + ) + self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait()) + + async def _wait_until_ready( + self, + check: Callable[[], Awaitable[None]], + retries: int = 40, + interval: float = 0.25, + runtime_name: str = 'runtime', + ) -> None: + """Repeatedly call *check* until it succeeds or retries are exhausted. + + Between attempts the method sleeps for *interval* seconds. If the + managed subprocess exits before readiness is confirmed, a + ``RuntimeError`` is raised immediately. + """ + last_exc: Exception | None = None + for _ in range(retries): + # Fast-fail if the process already died. + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None: + raise RuntimeError( + f'local {runtime_name} exited before becoming ready ' + f'(code {self.runtime_subprocess.returncode})' + ) + + try: + await check() + return + except Exception as exc: + last_exc = exc + await asyncio.sleep(interval) + + if last_exc is not None: + raise last_exc + raise RuntimeError(f'local {runtime_name} did not become ready') + + def _dispose_subprocess(self) -> None: + """Terminate the managed subprocess and cancel its wait task.""" + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: + self.ap.logger.info('Terminating managed runtime process...') + self.runtime_subprocess.terminate() + + if self.runtime_subprocess_task is not None: + self.runtime_subprocess_task.cancel() + self.runtime_subprocess_task = None diff --git a/tests/integration_tests/box/test_box_integration.py b/tests/integration_tests/box/test_box_integration.py index 5adf6245..1d970b72 100644 --- a/tests/integration_tests/box/test_box_integration.py +++ b/tests/integration_tests/box/test_box_integration.py @@ -12,21 +12,22 @@ CI pipeline. Run them locally with:: from __future__ import annotations +import asyncio import logging import shutil import socket import subprocess from types import SimpleNamespace +from unittest.mock import Mock import pytest -from aiohttp.test_utils import TestServer from langbot.pkg.box.backend import BaseSandboxBackend -from langbot.pkg.box.client import RemoteBoxRuntimeClient +from langbot.pkg.box.client import ActionRPCBoxClient from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxRuntimeUnavailableError from langbot.pkg.box.models import BoxExecutionStatus, BoxNetworkMode, BoxSpec from langbot.pkg.box.runtime import BoxRuntime -from langbot.pkg.box.server import create_app as create_server_app +from langbot.pkg.box.server import BoxServerHandler from langbot.pkg.box.service import BoxService import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -77,23 +78,61 @@ requires_socket = pytest.mark.skipif( ) +# ── Helpers ────────────────────────────────────────────────────────── + + +class _QueueConnection: + """In-process Connection backed by asyncio Queues — no real IO.""" + + def __init__(self, rx: asyncio.Queue[str], tx: asyncio.Queue[str]): + self._rx = rx + self._tx = tx + + async def send(self, message: str) -> None: + await self._tx.put(message) + + async def receive(self) -> str: + return await self._rx.get() + + async def close(self) -> None: + pass + + +async def _make_rpc_pair(runtime: BoxRuntime): + """Create an in-process (ActionRPCBoxClient, server_task, client_task) connected via queues.""" + from langbot_plugin.runtime.io.handler import Handler + + c2s: asyncio.Queue[str] = asyncio.Queue() + s2c: asyncio.Queue[str] = asyncio.Queue() + client_conn = _QueueConnection(rx=s2c, tx=c2s) + server_conn = _QueueConnection(rx=c2s, tx=s2c) + + server_handler = BoxServerHandler(server_conn, runtime) + server_task = asyncio.create_task(server_handler.run()) + + client_handler = Handler.__new__(Handler) + Handler.__init__(client_handler, client_conn) + client_task = asyncio.create_task(client_handler.run()) + + client = ActionRPCBoxClient(logger=_logger) + client.set_handler(client_handler) + + return client, server_task, client_task + + # ── Fixtures ────────────────────────────────────────────────────────── @pytest.fixture async def box_client(): - """Yield a RemoteBoxRuntimeClient backed by a real BoxRuntime HTTP server.""" + """Yield an ActionRPCBoxClient backed by a real BoxRuntime via in-process RPC.""" runtime = BoxRuntime(logger=_logger) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - client = RemoteBoxRuntimeClient( - base_url=str(server.make_url('')), - logger=_logger, - ) + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) yield client - await client.shutdown() - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() # ── 1. Simple command execution ─────────────────────────────────────── @@ -102,7 +141,7 @@ async def box_client(): @requires_container @requires_socket @pytest.mark.asyncio -async def test_exec_simple_command(box_client: RemoteBoxRuntimeClient): +async def test_exec_simple_command(box_client: ActionRPCBoxClient): """Box starts a simple command and returns stdout.""" spec = BoxSpec( cmd='echo hello-box', @@ -123,7 +162,7 @@ async def test_exec_simple_command(box_client: RemoteBoxRuntimeClient): @requires_container @requires_socket @pytest.mark.asyncio -async def test_session_persists_files(box_client: RemoteBoxRuntimeClient): +async def test_session_persists_files(box_client: ActionRPCBoxClient): """Write a file in one exec, read it back in a second exec on the same session.""" sid = 'int-persist' @@ -151,7 +190,7 @@ async def test_session_persists_files(box_client: RemoteBoxRuntimeClient): @requires_container @requires_socket @pytest.mark.asyncio -async def test_timeout_kills_command(box_client: RemoteBoxRuntimeClient): +async def test_timeout_kills_command(box_client: ActionRPCBoxClient): """A long-running command is killed after timeout_sec.""" session_id = 'int-timeout' spec = BoxSpec( @@ -176,7 +215,7 @@ async def test_timeout_kills_command(box_client: RemoteBoxRuntimeClient): @requires_container @requires_socket @pytest.mark.asyncio -async def test_offline_cannot_reach_network(box_client: RemoteBoxRuntimeClient): +async def test_offline_cannot_reach_network(box_client: ActionRPCBoxClient): """With network=OFF the sandbox cannot reach the internet.""" spec = BoxSpec( cmd='wget -q -O /dev/null --timeout=3 http://1.1.1.1 2>&1; exit $?', @@ -217,16 +256,11 @@ class _UnavailableBackend(BaseSandboxBackend): @requires_socket @pytest.mark.asyncio async def test_backend_unavailable_returns_error(): - """When no backend is available the full HTTP path returns BoxBackendUnavailableError.""" + """When no backend is available the full RPC path returns BoxBackendUnavailableError.""" runtime = BoxRuntime(logger=_logger, backends=[_UnavailableBackend()]) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) try: - client = RemoteBoxRuntimeClient( - base_url=str(server.make_url('')), - logger=_logger, - ) spec = BoxSpec( cmd='echo hello', session_id='int-no-backend', @@ -234,46 +268,24 @@ async def test_backend_unavailable_returns_error(): ) with pytest.raises(BoxBackendUnavailableError): await client.execute(spec) - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -# ── 6. Runtime unreachable ──────────────────────────────────────────── - - -@requires_socket -@pytest.mark.asyncio -async def test_runtime_unreachable_returns_error(): - """Connecting to a non-existent runtime raises BoxRuntimeUnavailableError.""" - client = RemoteBoxRuntimeClient( - base_url='http://127.0.0.1:19999', - logger=_logger, - ) - try: - with pytest.raises(BoxRuntimeUnavailableError): - await client.initialize() - finally: - await client.shutdown() - - -# ── 7. Full service-to-runtime path ────────────────────────────────── +# ── 6. Full service-to-runtime path ────────────────────────────────── @requires_container @requires_socket @pytest.mark.asyncio async def test_full_service_to_remote_runtime(tmp_path): - """BoxService -> RemoteBoxRuntimeClient -> HTTP -> BoxRuntime -> real backend.""" + """BoxService -> ActionRPCBoxClient -> RPC -> BoxRuntime -> real backend.""" runtime = BoxRuntime(logger=_logger) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) try: - client = RemoteBoxRuntimeClient( - base_url=str(server.make_url('')), - logger=_logger, - ) host_dir = tmp_path / 'workspace' host_dir.mkdir() @@ -303,6 +315,7 @@ async def test_full_service_to_remote_runtime(tmp_path): assert result['status'] == 'completed' assert 'service-path' in result['stdout'] assert result['session_id'] == '42' - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() diff --git a/tests/integration_tests/box/test_box_mcp_integration.py b/tests/integration_tests/box/test_box_mcp_integration.py index b984e74d..9f84b1c2 100644 --- a/tests/integration_tests/box/test_box_mcp_integration.py +++ b/tests/integration_tests/box/test_box_mcp_integration.py @@ -20,13 +20,14 @@ import subprocess import aiohttp import pytest +from aiohttp import web from aiohttp.test_utils import TestServer -from langbot.pkg.box.client import RemoteBoxRuntimeClient +from langbot.pkg.box.client import ActionRPCBoxClient from langbot.pkg.box.errors import BoxSessionNotFoundError from langbot.pkg.box.models import BoxManagedProcessSpec, BoxManagedProcessStatus, BoxSpec from langbot.pkg.box.runtime import BoxRuntime -from langbot.pkg.box.server import create_app as create_server_app +from langbot.pkg.box.server import BoxServerHandler, create_ws_relay_app _logger = logging.getLogger('test.box.mcp_integration') @@ -69,23 +70,71 @@ requires_socket = pytest.mark.skipif( ) +# ── Helpers ────────────────────────────────────────────────────────── + + +class _QueueConnection: + """In-process Connection backed by asyncio Queues — no real IO.""" + + def __init__(self, rx: asyncio.Queue[str], tx: asyncio.Queue[str]): + self._rx = rx + self._tx = tx + + async def send(self, message: str) -> None: + await self._tx.put(message) + + async def receive(self) -> str: + return await self._rx.get() + + async def close(self) -> None: + pass + + +async def _make_rpc_pair(runtime: BoxRuntime): + """Create an in-process RPC pair connected via queues.""" + from langbot_plugin.runtime.io.handler import Handler + + c2s: asyncio.Queue[str] = asyncio.Queue() + s2c: asyncio.Queue[str] = asyncio.Queue() + client_conn = _QueueConnection(rx=s2c, tx=c2s) + server_conn = _QueueConnection(rx=c2s, tx=s2c) + + server_handler = BoxServerHandler(server_conn, runtime) + server_task = asyncio.create_task(server_handler.run()) + + client_handler = Handler.__new__(Handler) + Handler.__init__(client_handler, client_conn) + client_task = asyncio.create_task(client_handler.run()) + + client = ActionRPCBoxClient(logger=_logger) + client.set_handler(client_handler) + + return client, server_task, client_task + + # ── Fixtures ────────────────────────────────────────────────────────── @pytest.fixture async def box_server(): - """Yield a (TestServer, RemoteBoxRuntimeClient) backed by a real BoxRuntime.""" + """Yield a (ws_relay_url, ActionRPCBoxClient) backed by a real BoxRuntime.""" runtime = BoxRuntime(logger=_logger) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - client = RemoteBoxRuntimeClient( - base_url=str(server.make_url('')), - logger=_logger, - ) - yield server, client - await client.shutdown() - await server.close() + await runtime.initialize() + + # Start ws relay for managed process attach + ws_app = create_ws_relay_app(runtime) + ws_server = TestServer(ws_app) + await ws_server.start_server() + + client, server_task, client_task = await _make_rpc_pair(runtime) + + ws_relay_url = str(ws_server.make_url('')) + yield ws_relay_url, client + + server_task.cancel() + client_task.cancel() + await runtime.shutdown() + await ws_server.close() # ── 1. Managed process lifecycle ───────────────────────────────────── @@ -96,7 +145,7 @@ async def box_server(): @pytest.mark.asyncio async def test_managed_process_start_and_query(box_server): """Start a managed process and query its status.""" - server, client = box_server + ws_relay_url, client = box_server # Create session spec = BoxSpec( @@ -133,7 +182,7 @@ async def test_managed_process_start_and_query(box_server): @pytest.mark.asyncio async def test_ws_stdio_attach_echo(box_server): """Attach to a managed process via WebSocket and verify bidirectional IO.""" - server, client = box_server + ws_relay_url, client = box_server spec = BoxSpec( cmd='', @@ -151,8 +200,8 @@ async def test_ws_stdio_attach_echo(box_server): ) await client.start_managed_process('mcp-int-ws', proc_spec) - # Connect via WebSocket - ws_url = client.get_managed_process_websocket_url('mcp-int-ws') + # Connect via WebSocket (ws relay) + ws_url = client.get_managed_process_websocket_url('mcp-int-ws', ws_relay_url) session = aiohttp.ClientSession() try: async with session.ws_connect(ws_url) as ws: @@ -177,7 +226,7 @@ async def test_ws_stdio_attach_echo(box_server): @pytest.mark.asyncio async def test_delete_session_cleans_up(box_server): """After deleting a session, it should no longer exist.""" - server, client = box_server + ws_relay_url, client = box_server spec = BoxSpec( cmd='', @@ -203,15 +252,15 @@ async def test_delete_session_cleans_up(box_server): await client.get_session('mcp-int-cleanup') -# ── 4. GET /v1/sessions/{id} ──────────────────────────────────────── +# ── 4. GET session details ──────────────────────────────────────── @requires_container @requires_socket @pytest.mark.asyncio async def test_get_session_returns_details(box_server): - """GET single session returns session details and managed process info.""" - server, client = box_server + """Get single session returns session details and managed process info.""" + ws_relay_url, client = box_server spec = BoxSpec( cmd='', @@ -251,7 +300,7 @@ async def test_get_session_returns_details(box_server): @pytest.mark.asyncio async def test_process_exit_detected(box_server): """When a managed process exits, its status should reflect EXITED.""" - server, client = box_server + ws_relay_url, client = box_server spec = BoxSpec( cmd='', @@ -287,7 +336,7 @@ async def test_process_exit_detected(box_server): @pytest.mark.asyncio async def test_orphan_cleanup_preserves_own_containers(box_server): """Orphan cleanup should not remove containers belonging to the current instance.""" - server, client = box_server + ws_relay_url, client = box_server # Create a session (container gets current instance ID label) spec = BoxSpec( diff --git a/tests/unit_tests/box/test_box_connector.py b/tests/unit_tests/box/test_box_connector.py index 8b741bed..0740c53b 100644 --- a/tests/unit_tests/box/test_box_connector.py +++ b/tests/unit_tests/box/test_box_connector.py @@ -1,11 +1,11 @@ from __future__ import annotations from types import SimpleNamespace -from unittest.mock import AsyncMock, Mock +from unittest.mock import AsyncMock, Mock, patch import pytest -from langbot.pkg.box.client import RemoteBoxRuntimeClient +from langbot.pkg.box.client import ActionRPCBoxClient from langbot.pkg.box.connector import BoxRuntimeConnector from langbot.pkg.box.errors import BoxRuntimeUnavailableError @@ -31,95 +31,57 @@ def patch_platform(monkeypatch: pytest.MonkeyPatch, value: str): monkeypatch.setattr('langbot.pkg.box.connector.platform.get_platform', lambda: value) -def test_box_runtime_connector_uses_explicit_runtime_url(): +def test_box_runtime_connector_manages_local_when_no_url(monkeypatch: pytest.MonkeyPatch): + patch_platform(monkeypatch, 'linux') + connector = BoxRuntimeConnector(make_app(Mock())) + + assert connector.manages_local_runtime is True + assert isinstance(connector.client, ActionRPCBoxClient) + + +def test_box_runtime_connector_remote_when_url_configured(): logger = Mock() connector = BoxRuntimeConnector(make_app(logger, runtime_url='http://box-runtime:5410')) - assert connector.runtime_url == 'http://box-runtime:5410' assert connector.manages_local_runtime is False - assert isinstance(connector.client, RemoteBoxRuntimeClient) - assert connector.client._base_url == 'http://box-runtime:5410' + assert isinstance(connector.client, ActionRPCBoxClient) -def test_box_runtime_connector_uses_local_default_runtime_url(monkeypatch: pytest.MonkeyPatch): - patch_platform(monkeypatch, 'linux') - - connector = BoxRuntimeConnector(make_app(Mock())) - - assert connector.runtime_url == 'http://127.0.0.1:5410' - assert connector.manages_local_runtime is True - assert connector.client._base_url == 'http://127.0.0.1:5410' - - -def test_box_runtime_connector_uses_docker_default_runtime_url(monkeypatch: pytest.MonkeyPatch): +def test_box_runtime_connector_remote_when_docker(monkeypatch: pytest.MonkeyPatch): patch_platform(monkeypatch, 'docker') - connector = BoxRuntimeConnector(make_app(Mock())) - assert connector.runtime_url == 'http://langbot_box_runtime:5410' assert connector.manages_local_runtime is False - assert connector.client._base_url == 'http://langbot_box_runtime:5410' + assert connector.ws_relay_base_url == 'http://langbot_box_runtime:5410' -@pytest.mark.asyncio -async def test_box_runtime_connector_initialize_delegates_to_client_when_runtime_is_healthy( - monkeypatch: pytest.MonkeyPatch, -): +def test_box_runtime_connector_ws_relay_url_default(monkeypatch: pytest.MonkeyPatch): patch_platform(monkeypatch, 'linux') connector = BoxRuntimeConnector(make_app(Mock())) - connector.client.initialize = AsyncMock() - connector._start_local_runtime_process = AsyncMock() - connector._wait_until_runtime_ready = AsyncMock() - await connector.initialize() - - connector.client.initialize.assert_awaited_once() - connector._start_local_runtime_process.assert_not_awaited() - connector._wait_until_runtime_ready.assert_not_awaited() + assert connector.ws_relay_base_url == 'http://127.0.0.1:5410' -@pytest.mark.asyncio -async def test_box_runtime_connector_initialize_autostarts_local_runtime_when_unavailable( - monkeypatch: pytest.MonkeyPatch, -): - patch_platform(monkeypatch, 'linux') - connector = BoxRuntimeConnector(make_app(Mock())) - connector.client.initialize = AsyncMock(side_effect=BoxRuntimeUnavailableError('down')) - connector._start_local_runtime_process = AsyncMock() - connector._wait_until_runtime_ready = AsyncMock() - - await connector.initialize() - - connector.client.initialize.assert_awaited_once() - connector._start_local_runtime_process.assert_awaited_once() - connector._wait_until_runtime_ready.assert_awaited_once() - - -@pytest.mark.asyncio -async def test_box_runtime_connector_initialize_remote_runtime_does_not_autostart(): +def test_box_runtime_connector_ws_relay_url_explicit(): connector = BoxRuntimeConnector(make_app(Mock(), runtime_url='http://box-runtime:5410')) - connector.client.initialize = AsyncMock() - connector._start_local_runtime_process = AsyncMock() - connector._wait_until_runtime_ready = AsyncMock() - - await connector.initialize() - - connector.client.initialize.assert_awaited_once() - connector._start_local_runtime_process.assert_not_awaited() - connector._wait_until_runtime_ready.assert_not_awaited() + assert connector.ws_relay_base_url == 'http://box-runtime:5410' -def test_box_runtime_connector_dispose_terminates_local_runtime_process(): +def test_box_runtime_connector_dispose_terminates_subprocess(): logger = Mock() connector = BoxRuntimeConnector(make_app(logger)) - runtime_process = Mock() - runtime_process.returncode = None - runtime_task = Mock() - connector.runtime_subprocess = runtime_process - connector.runtime_subprocess_task = runtime_task + subprocess = Mock() + subprocess.returncode = None + handler_task = Mock() + ctrl_task = Mock() + connector._subprocess = subprocess + connector._handler_task = handler_task + connector._ctrl_task = ctrl_task connector.dispose() - runtime_process.terminate.assert_called_once() - runtime_task.cancel.assert_called_once() - assert connector.runtime_subprocess_task is None + subprocess.terminate.assert_called_once() + handler_task.cancel.assert_called_once() + ctrl_task.cancel.assert_called_once() + assert connector._handler_task is None + assert connector._ctrl_task is None diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index 61f6530e..62951b84 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio import datetime as dt import os -import socket from types import SimpleNamespace from unittest.mock import AsyncMock, Mock @@ -12,7 +11,7 @@ import pytest import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query from langbot.pkg.box.backend import BaseSandboxBackend -from langbot.pkg.box.client import BoxRuntimeClient, RemoteBoxRuntimeClient +from langbot.pkg.box.client import BoxRuntimeClient, ActionRPCBoxClient from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError from langbot.pkg.box.models import ( BUILTIN_PROFILES, @@ -71,20 +70,6 @@ class _InProcessBoxRuntimeClient(BoxRuntimeClient): return self._runtime.get_session(session_id) -def _can_open_test_socket() -> bool: - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - except OSError: - return False - sock.close() - return True - - -requires_socket = pytest.mark.skipif( - not _can_open_test_socket(), - reason='local test environment does not permit opening TCP sockets', -) - class FakeBackend(BaseSandboxBackend): def __init__(self, logger: Mock, available: bool = True): @@ -787,27 +772,65 @@ async def test_service_get_status_aggregates_runtime_and_profile(): assert status['recent_error_count'] == 0 -# ── RemoteBoxRuntimeClient tests ───────────────────────────────────── +# ── In-process RPC client/server tests ───────────────────────────────── + + +class _QueueConnection: + """In-process Connection backed by asyncio Queues — no real IO.""" + + def __init__(self, rx: asyncio.Queue[str], tx: asyncio.Queue[str]): + self._rx = rx + self._tx = tx + + async def send(self, message: str) -> None: + await self._tx.put(message) + + async def receive(self) -> str: + return await self._rx.get() + + async def close(self) -> None: + pass + + +def _make_queue_connection_pair(): + """Return (client_conn, server_conn) linked by queues.""" + c2s: asyncio.Queue[str] = asyncio.Queue() + s2c: asyncio.Queue[str] = asyncio.Queue() + client_conn = _QueueConnection(rx=s2c, tx=c2s) + server_conn = _QueueConnection(rx=c2s, tx=s2c) + return client_conn, server_conn + + +async def _make_rpc_pair(runtime: BoxRuntime): + """Create an in-process (ActionRPCBoxClient, server_task, client_task) connected via queues.""" + from langbot.pkg.box.server import BoxServerHandler + from langbot_plugin.runtime.io.handler import Handler + + client_conn, server_conn = _make_queue_connection_pair() + + server_handler = BoxServerHandler(server_conn, runtime) + server_task = asyncio.create_task(server_handler.run()) + + client_handler = Handler.__new__(Handler) + Handler.__init__(client_handler, client_conn) + client_task = asyncio.create_task(client_handler.run()) + + client = ActionRPCBoxClient(logger=Mock()) + client.set_handler(client_handler) + + return client, server_task, client_task -@requires_socket @pytest.mark.asyncio -async def test_remote_client_execute(): - """RemoteBoxRuntimeClient correctly posts to server and parses result.""" - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_execute(): + """ActionRPCBoxClient correctly calls server and parses result.""" logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) - await client.initialize() + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) + try: spec = BoxSpec.model_validate({'cmd': 'echo remote', 'session_id': 'r-1'}) result = await client.execute(spec) @@ -815,353 +838,122 @@ async def test_remote_client_execute(): assert result.status == BoxExecutionStatus.COMPLETED assert result.exit_code == 0 assert result.stdout == 'executed: echo remote' - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_get_sessions(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_get_sessions(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) + try: spec = BoxSpec.model_validate({'cmd': 'echo hi', 'session_id': 'r-2'}) await client.execute(spec) sessions = await client.get_sessions() assert len(sessions) == 1 assert sessions[0]['session_id'] == 'r-2' - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_get_status(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_get_status(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() + await runtime.initialize() + + client, server_task, client_task = await _make_rpc_pair(runtime) try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) status = await client.get_status() assert 'backend' in status assert 'active_sessions' in status - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_get_backend_info(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_get_backend_info(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() + await runtime.initialize() + + client, server_task, client_task = await _make_rpc_pair(runtime) try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) info = await client.get_backend_info() assert info['name'] == 'fake' assert info['available'] is True - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -# ── Server endpoint tests ──────────────────────────────────────────── - - -@requires_socket -@pytest.mark.asyncio -async def test_server_delete_session(): - from aiohttp.test_utils import TestClient, TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - test_client = TestClient(server) - await test_client.start_server() - try: - # Create a session via exec - resp = await test_client.post('/v1/sessions/del-1/exec', json={'cmd': 'echo hi'}) - assert resp.status == 200 - - # Delete it - resp = await test_client.delete('/v1/sessions/del-1') - assert resp.status == 200 - data = await resp.json() - assert data['deleted'] == 'del-1' - - # Verify session is gone - resp = await test_client.get('/v1/sessions') - sessions = await resp.json() - assert len(sessions) == 0 - finally: - await test_client.close() - - -# ── Runtime delete_session / create_session tests ──────────────────── +# ── RPC-based delete/create/conflict tests ──────────────────────────── @pytest.mark.asyncio -async def test_runtime_delete_session(): +async def test_rpc_client_delete_session(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) await runtime.initialize() - await runtime.execute(BoxSpec.model_validate({'cmd': 'echo', 'session_id': 'del-test'})) - assert len(runtime.get_sessions()) == 1 - - await runtime.delete_session('del-test') - assert len(runtime.get_sessions()) == 0 - assert backend.stop_calls == ['del-test'] - - -@pytest.mark.asyncio -async def test_runtime_delete_session_not_found(): - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - await runtime.initialize() - - with pytest.raises(BoxSessionNotFoundError): - await runtime.delete_session('nonexistent') - - -@pytest.mark.asyncio -async def test_runtime_create_session(): - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - await runtime.initialize() - - spec = BoxSpec.model_validate({'cmd': 'placeholder', 'session_id': 'create-1'}) - info = await runtime.create_session(spec) - assert info['session_id'] == 'create-1' - assert info['backend_name'] == 'fake' - - sessions = runtime.get_sessions() - assert len(sessions) == 1 - assert sessions[0]['session_id'] == 'create-1' - - -# ── Server structured error tests ──────────────────────────────────── - - -@requires_socket -@pytest.mark.asyncio -async def test_server_delete_nonexistent_session(): - from aiohttp.test_utils import TestClient, TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - test_client = TestClient(server) - await test_client.start_server() + client, server_task, client_task = await _make_rpc_pair(runtime) try: - resp = await test_client.delete('/v1/sessions/nonexistent') - assert resp.status == 404 - data = await resp.json() - assert data['error']['code'] == 'session_not_found' - finally: - await test_client.close() - - -@requires_socket -@pytest.mark.asyncio -async def test_server_exec_returns_structured_error_on_conflict(): - from aiohttp.test_utils import TestClient, TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - test_client = TestClient(server) - await test_client.start_server() - try: - # Create session with network=off - resp = await test_client.post('/v1/sessions/conflict-1/exec', json={'cmd': 'echo hi', 'network': 'off'}) - assert resp.status == 200 - - # Try to use same session with network=on -> conflict - resp = await test_client.post('/v1/sessions/conflict-1/exec', json={'cmd': 'echo hi', 'network': 'on'}) - assert resp.status == 409 - data = await resp.json() - assert data['error']['code'] == 'session_conflict' - finally: - await test_client.close() - - -@requires_socket -@pytest.mark.asyncio -async def test_server_create_session(): - from aiohttp.test_utils import TestClient, TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - test_client = TestClient(server) - await test_client.start_server() - try: - resp = await test_client.post('/v1/sessions/new-1', json={'image': 'python:3.11-slim'}) - assert resp.status == 201 - data = await resp.json() - assert data['session_id'] == 'new-1' - assert data['backend_name'] == 'fake' - assert 'created_at' in data - - # Session should appear in list - resp = await test_client.get('/v1/sessions') - sessions = await resp.json() - assert len(sessions) == 1 - assert sessions[0]['session_id'] == 'new-1' - finally: - await test_client.close() - - -@requires_socket -@pytest.mark.asyncio -async def test_server_create_session_conflict(): - from aiohttp.test_utils import TestClient, TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - test_client = TestClient(server) - await test_client.start_server() - try: - resp = await test_client.post('/v1/sessions/dup-1', json={'network': 'off'}) - assert resp.status == 201 - - # Conflicting create with different network - resp = await test_client.post('/v1/sessions/dup-1', json={'network': 'on'}) - assert resp.status == 409 - data = await resp.json() - assert data['error']['code'] == 'session_conflict' - finally: - await test_client.close() - - -# ── Remote client error translation tests ───────────────────────────── - - -@requires_socket -@pytest.mark.asyncio -async def test_remote_client_delete_session(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - - logger = Mock() - backend = FakeBackend(logger) - runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) - - # Create session via exec spec = BoxSpec.model_validate({'cmd': 'echo hi', 'session_id': 'r-del-1'}) await client.execute(spec) - # Delete it await client.delete_session('r-del-1') - # Verify empty sessions = await client.get_sessions() assert len(sessions) == 0 - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_delete_session_raises_not_found(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_delete_session_raises_not_found(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) + try: with pytest.raises(BoxSessionNotFoundError): await client.delete_session('nonexistent') - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_create_session(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_create_session(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + await runtime.initialize() + client, server_task, client_task = await _make_rpc_pair(runtime) + try: spec = BoxSpec.model_validate({'cmd': 'placeholder', 'session_id': 'r-create-1'}) info = await client.create_session(spec) assert info['session_id'] == 'r-create-1' @@ -1169,38 +961,31 @@ async def test_remote_client_create_session(): sessions = await client.get_sessions() assert len(sessions) == 1 - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() -@requires_socket @pytest.mark.asyncio -async def test_remote_client_exec_raises_conflict_error(): - from aiohttp.test_utils import TestServer - - from langbot.pkg.box.server import create_app as create_server_app - +async def test_rpc_client_exec_raises_conflict_error(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - app = create_server_app(runtime) - server = TestServer(app) - await server.start_server() - try: - client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + await runtime.initialize() - # Create session with network=off + client, server_task, client_task = await _make_rpc_pair(runtime) + try: spec1 = BoxSpec.model_validate({'cmd': 'echo first', 'session_id': 'r-conflict-1', 'network': 'off'}) await client.execute(spec1) - # Conflicting exec with network=on spec2 = BoxSpec.model_validate({'cmd': 'echo second', 'session_id': 'r-conflict-1', 'network': 'on'}) with pytest.raises(BoxSessionConflictError): await client.execute(spec2) - await client.shutdown() finally: - await server.close() + server_task.cancel() + client_task.cancel() + await runtime.shutdown() # ── BoxHostMountMode.NONE tests ─────────────────────────────────────