refactor: use rpc

This commit is contained in:
youhuanghe
2026-03-21 10:28:03 +00:00
committed by WangCham
parent 791d052687
commit 14057d1722
12 changed files with 791 additions and 857 deletions

View File

@@ -0,0 +1,21 @@
"""Box-specific action types for the action RPC protocol."""
from __future__ import annotations
from langbot_plugin.entities.io.actions.enums import ActionType
class LangBotToBoxAction(ActionType):
"""Actions sent from LangBot to the Box runtime."""
HEALTH = "box_health"
STATUS = "box_status"
EXEC = "box_exec"
CREATE_SESSION = "box_create_session"
GET_SESSION = "box_get_session"
GET_SESSIONS = "box_get_sessions"
DELETE_SESSION = "box_delete_session"
START_MANAGED_PROCESS = "box_start_managed_process"
GET_MANAGED_PROCESS = "box_get_managed_process"
GET_BACKEND_INFO = "box_get_backend_info"
SHUTDOWN = "box_shutdown"

View File

@@ -1,23 +1,15 @@
"""BoxRuntimeClient abstraction for remote Box Runtime access."""
"""BoxRuntimeClient abstraction for Box Runtime access."""
from __future__ import annotations
import abc
import logging
from typing import TYPE_CHECKING
from typing import Any, TYPE_CHECKING
import aiohttp
from langbot_plugin.runtime.io.handler import Handler
from .errors import (
BoxBackendUnavailableError,
BoxError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxRuntimeUnavailableError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
from .actions import LangBotToBoxAction
from .errors import BoxError, BoxRuntimeUnavailableError
from .models import (
BoxExecutionResult,
BoxExecutionStatus,
@@ -31,19 +23,9 @@ from ..utils import platform
if TYPE_CHECKING:
from ..core import app as core_app
_ERROR_CODE_MAP: dict[str, type[BoxError]] = {
'validation_error': BoxValidationError,
'session_not_found': BoxSessionNotFoundError,
'session_conflict': BoxSessionConflictError,
'managed_process_not_found': BoxManagedProcessNotFoundError,
'managed_process_conflict': BoxManagedProcessConflictError,
'backend_unavailable': BoxBackendUnavailableError,
'runtime_unavailable': BoxRuntimeUnavailableError,
'internal_error': BoxError,
}
def resolve_box_runtime_url(ap: 'core_app.Application') -> str:
def resolve_box_ws_relay_url(ap: 'core_app.Application') -> str:
"""Derive the ws relay base URL used for managed-process attach."""
runtime_url = str(get_box_config(ap).get('runtime_url', '')).strip()
if runtime_url:
return runtime_url
@@ -90,54 +72,64 @@ class BoxRuntimeClient(abc.ABC):
async def get_session(self, session_id: str) -> dict: ...
class RemoteBoxRuntimeClient(BoxRuntimeClient):
"""HTTP client that talks to a standalone Box Runtime service."""
def _translate_action_error(exc: Exception) -> BoxError:
"""Convert an ActionCallError message back into the appropriate BoxError subclass."""
from .errors import (
BoxBackendUnavailableError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
msg = str(exc)
_ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [
('BoxValidationError:', BoxValidationError),
('BoxSessionNotFoundError:', BoxSessionNotFoundError),
('BoxSessionConflictError:', BoxSessionConflictError),
('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError),
('BoxManagedProcessConflictError:', BoxManagedProcessConflictError),
('BoxBackendUnavailableError:', BoxBackendUnavailableError),
]
for prefix, cls in _ERROR_PREFIX_MAP:
if prefix in msg:
return cls(msg)
return BoxError(msg)
def __init__(self, base_url: str, logger: logging.Logger):
self._base_url = base_url.rstrip('/')
class ActionRPCBoxClient(BoxRuntimeClient):
"""Client that talks to BoxRuntime via the action RPC protocol."""
def __init__(self, logger: logging.Logger):
self._logger = logger
self._session: aiohttp.ClientSession | None = None
self._handler: Handler | None = None
def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
@property
def handler(self) -> Handler:
if self._handler is None:
raise BoxRuntimeUnavailableError('box runtime not connected')
return self._handler
async def _check_response(self, resp: aiohttp.ClientResponse) -> None:
if resp.status < 400:
return
def set_handler(self, handler: Handler) -> None:
self._handler = handler
async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]:
try:
body = await resp.json()
error_info = body.get('error', {})
code = error_info.get('code', '')
message = error_info.get('message', '')
except Exception:
resp.raise_for_status()
return
exc_class = _ERROR_CODE_MAP.get(code, BoxError)
raise exc_class(message)
return await self.handler.call_action(action, data, timeout=timeout)
except BoxRuntimeUnavailableError:
raise
except Exception as exc:
raise _translate_action_error(exc) from exc
async def initialize(self) -> None:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/health') as resp:
await self._check_response(resp)
self._logger.info(f'LangBot Box runtime connected: {self._base_url}')
except aiohttp.ClientError as exc:
await self._call(LangBotToBoxAction.HEALTH, {})
self._logger.info('LangBot Box runtime connected via action RPC.')
except Exception as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
session = self._get_session()
payload = spec.model_dump(mode='json')
try:
async with session.post(
f'{self._base_url}/v1/sessions/{spec.session_id}/exec',
json=payload,
) as resp:
await self._check_response(resp)
data = await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0)
return BoxExecutionResult(
session_id=data['session_id'],
backend_name=data['backend_name'],
@@ -149,103 +141,52 @@ class RemoteBoxRuntimeClient(BoxRuntimeClient):
)
async def shutdown(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
self._session = None
if self._handler is not None:
try:
await self._call(LangBotToBoxAction.SHUTDOWN, {})
except Exception:
pass
self._handler = None
async def get_status(self) -> dict:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/status') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
return await self._call(LangBotToBoxAction.STATUS, {})
async def get_sessions(self) -> list[dict]:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/sessions') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
data = await self._call(LangBotToBoxAction.GET_SESSIONS, {})
return data['sessions']
async def get_session(self, session_id: str) -> dict:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/sessions/{session_id}') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id})
async def get_backend_info(self) -> dict:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/health') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {})
async def delete_session(self, session_id: str) -> None:
session = self._get_session()
try:
async with session.delete(
f'{self._base_url}/v1/sessions/{session_id}',
) as resp:
await self._check_response(resp)
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id})
async def create_session(self, spec: BoxSpec) -> dict:
session = self._get_session()
payload = spec.model_dump(mode='json')
try:
async with session.post(
f'{self._base_url}/v1/sessions/{spec.session_id}',
json=payload,
) as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json'))
async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo:
session = self._get_session()
payload = spec.model_dump(mode='json')
try:
async with session.post(
f'{self._base_url}/v1/sessions/{session_id}/managed-process',
json=payload,
) as resp:
await self._check_response(resp)
data = await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
data = await self._call(
LangBotToBoxAction.START_MANAGED_PROCESS,
{'session_id': session_id, 'spec': spec.model_dump(mode='json')},
)
return BoxManagedProcessInfo.model_validate(data)
async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo:
session = self._get_session()
try:
async with session.get(
f'{self._base_url}/v1/sessions/{session_id}/managed-process',
) as resp:
await self._check_response(resp)
data = await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id})
return BoxManagedProcessInfo.model_validate(data)
def get_managed_process_websocket_url(self, session_id: str) -> str:
if self._base_url.startswith('https://'):
def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str:
base = ws_relay_base_url
if base.startswith('https://'):
scheme = 'wss://'
suffix = self._base_url[len('https://'):]
elif self._base_url.startswith('http://'):
suffix = base[len('https://'):]
elif base.startswith('http://'):
scheme = 'ws://'
suffix = self._base_url[len('http://'):]
suffix = base[len('http://'):]
else:
scheme = 'ws://'
suffix = self._base_url
suffix = base
return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws'

View File

@@ -5,8 +5,12 @@ import os
import sys
from typing import TYPE_CHECKING
from langbot_plugin.entities.io.actions.enums import CommonAction
from langbot_plugin.runtime.io.handler import Handler
from langbot_plugin.runtime.io.connection import Connection
from .client import ActionRPCBoxClient, resolve_box_ws_relay_url
from .errors import BoxRuntimeUnavailableError
from .client import RemoteBoxRuntimeClient, resolve_box_runtime_url
from .models import get_box_config
from ..utils import platform
@@ -15,44 +19,129 @@ if TYPE_CHECKING:
class BoxRuntimeConnector:
"""Build and initialize the Box runtime-facing service for the app."""
_HEALTH_CHECK_RETRY_COUNT = 40
_HEALTH_CHECK_RETRY_INTERVAL_SEC = 0.25
"""Connect to the Box runtime via action RPC (stdio or ws)."""
def __init__(self, ap: 'core_app.Application'):
self.ap = ap
self.configured_runtime_url = self._load_configured_runtime_url()
self.runtime_url = self.configured_runtime_url or resolve_box_runtime_url(ap)
self.manages_local_runtime = self._should_manage_local_runtime()
self.client = RemoteBoxRuntimeClient(base_url=self.runtime_url, logger=ap.logger)
self.runtime_subprocess: asyncio.subprocess.Process | None = None
self.runtime_subprocess_task: asyncio.Task | None = None
self.ws_relay_base_url = resolve_box_ws_relay_url(ap)
self.client = ActionRPCBoxClient(logger=ap.logger)
self._handler: Handler | None = None
self._handler_task: asyncio.Task | None = None
self._ctrl_task: asyncio.Task | None = None
self._subprocess: asyncio.subprocess.Process | None = None
self._subprocess_wait_task: asyncio.Task | None = None
async def initialize(self) -> None:
if not self.manages_local_runtime:
await self.client.initialize()
return
if self.manages_local_runtime:
await self._start_local_stdio()
else:
await self._connect_remote_ws()
async def _start_local_stdio(self) -> None:
"""Launch box server as subprocess and connect via stdio."""
from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController
python_path = sys.executable
env = os.environ.copy()
connected = asyncio.Event()
connect_error: list[Exception] = []
async def new_connection_callback(connection: Connection) -> None:
handler = Handler.__new__(Handler)
Handler.__init__(handler, connection)
self._handler = handler
self.client.set_handler(handler)
self._handler_task = asyncio.create_task(handler.run())
try:
await handler.call_action(CommonAction.PING, {})
self.ap.logger.info('Connected to Box runtime via stdio.')
connected.set()
await self._handler_task
except Exception as exc:
if not connected.is_set():
connect_error.append(exc)
connected.set()
ctrl = StdioClientController(
command=python_path,
args=['-m', 'langbot.pkg.box.server', '--port', str(self._get_ws_relay_port())],
env=env,
)
self._subprocess = None # StdioClientController manages the subprocess
self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback))
# Wait for connection or failure
try:
await asyncio.wait_for(connected.wait(), timeout=30.0)
except asyncio.TimeoutError:
raise BoxRuntimeUnavailableError('box runtime subprocess did not connect in time')
if connect_error:
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
# Store subprocess reference for dispose
self._subprocess = ctrl.process
async def _connect_remote_ws(self) -> None:
"""Connect to a remote box server via WebSocket."""
from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController
ws_url = self._get_rpc_ws_url()
connected = asyncio.Event()
connect_error: list[Exception] = []
async def new_connection_callback(connection: Connection) -> None:
handler = Handler.__new__(Handler)
Handler.__init__(handler, connection)
self._handler = handler
self.client.set_handler(handler)
self._handler_task = asyncio.create_task(handler.run())
try:
await handler.call_action(CommonAction.PING, {})
self.ap.logger.info('Connected to Box runtime via WebSocket.')
connected.set()
await self._handler_task
except Exception as exc:
if not connected.is_set():
connect_error.append(exc)
connected.set()
async def on_connect_failed(ctrl, exc):
connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed'))
connected.set()
ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed)
self._ctrl_task = asyncio.create_task(ctrl.run(new_connection_callback))
try:
await self.client.initialize()
return
except BoxRuntimeUnavailableError:
self.ap.logger.info(
'Local Box runtime is not running, starting an embedded Box runtime server...'
)
await asyncio.wait_for(connected.wait(), timeout=30.0)
except asyncio.TimeoutError:
raise BoxRuntimeUnavailableError('box runtime ws connection timed out')
await self._start_local_runtime_process()
await self._wait_until_runtime_ready()
if connect_error:
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
def dispose(self) -> None:
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
self.ap.logger.info('Terminating local Box runtime process...')
self.runtime_subprocess.terminate()
if self._handler_task is not None:
self._handler_task.cancel()
self._handler_task = None
if self.runtime_subprocess_task is not None:
self.runtime_subprocess_task.cancel()
self.runtime_subprocess_task = None
if self._ctrl_task is not None:
self._ctrl_task.cancel()
self._ctrl_task = None
if self._subprocess is not None and self._subprocess.returncode is None:
self.ap.logger.info('Terminating managed box runtime process...')
self._subprocess.terminate()
if self._subprocess_wait_task is not None:
self._subprocess_wait_task.cancel()
self._subprocess_wait_task = None
def _load_configured_runtime_url(self) -> str:
return str(get_box_config(self.ap).get('runtime_url', '')).strip()
@@ -60,36 +149,19 @@ class BoxRuntimeConnector:
def _should_manage_local_runtime(self) -> bool:
return not self.configured_runtime_url and platform.get_platform() != 'docker'
async def _start_local_runtime_process(self) -> None:
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
return
def _get_ws_relay_port(self) -> int:
"""Extract the port for ws relay from ws_relay_base_url."""
from urllib.parse import urlparse
parsed = urlparse(self.ws_relay_base_url)
return parsed.port or 5410
python_path = sys.executable
env = os.environ.copy()
self.runtime_subprocess = await asyncio.create_subprocess_exec(
python_path,
'-m',
'langbot.pkg.box.server',
env=env,
)
self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait())
def _get_rpc_ws_url(self) -> str:
"""Derive the action RPC ws URL from the configured runtime URL.
async def _wait_until_runtime_ready(self) -> None:
last_exc: BoxRuntimeUnavailableError | None = None
for _ in range(self._HEALTH_CHECK_RETRY_COUNT):
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None:
raise BoxRuntimeUnavailableError(
f'local box runtime exited before becoming ready (code {self.runtime_subprocess.returncode})'
)
try:
await self.client.initialize()
self.ap.logger.info(f'Local Box runtime is ready at {self.runtime_url}.')
return
except BoxRuntimeUnavailableError as exc:
last_exc = exc
await asyncio.sleep(self._HEALTH_CHECK_RETRY_INTERVAL_SEC)
if last_exc is not None:
raise last_exc
raise BoxRuntimeUnavailableError('local box runtime did not become ready')
The RPC endpoint is on port+1 relative to the ws relay port.
"""
from urllib.parse import urlparse
parsed = urlparse(self.ws_relay_base_url)
host = parsed.hostname or '127.0.0.1'
port = (parsed.port or 5410) + 1
return f'ws://{host}:{port}'

View File

@@ -1,7 +1,10 @@
"""Standalone HTTP service exposing BoxRuntime as a REST API.
"""Standalone Box Runtime service exposing BoxRuntime via action RPC.
Usage:
python -m langbot.pkg.box.server [--host 0.0.0.0] [--port 5410]
Usage (stdio, launched by LangBot as subprocess):
python -m langbot.pkg.box.server
Usage (ws + ws relay, for remote/docker mode):
python -m langbot.pkg.box.server --port 5410
"""
from __future__ import annotations
@@ -10,46 +13,29 @@ import argparse
import asyncio
import datetime as dt
import logging
import sys
from typing import Any
import pydantic
from aiohttp import web
from langbot_plugin.entities.io.actions.enums import CommonAction
from langbot_plugin.entities.io.resp import ActionResponse
from langbot_plugin.runtime.io.connection import Connection
from langbot_plugin.runtime.io.handler import Handler
from .actions import LangBotToBoxAction
from .errors import (
BoxBackendUnavailableError,
BoxError,
BoxManagedProcessConflictError,
BoxManagedProcessNotFoundError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec
from .runtime import BoxRuntime
logger = logging.getLogger('langbot.box.server')
_ERROR_MAP: dict[type, tuple[int, str]] = {
BoxValidationError: (400, 'validation_error'),
BoxSessionNotFoundError: (404, 'session_not_found'),
BoxSessionConflictError: (409, 'session_conflict'),
BoxManagedProcessNotFoundError: (404, 'managed_process_not_found'),
BoxManagedProcessConflictError: (409, 'managed_process_conflict'),
BoxBackendUnavailableError: (503, 'backend_unavailable'),
}
def _error_response(exc: Exception) -> web.Response:
for exc_type, (status, code) in _ERROR_MAP.items():
if isinstance(exc, exc_type):
return web.json_response(
{'error': {'code': code, 'message': str(exc)}},
status=status,
)
return web.json_response(
{'error': {'code': 'internal_error', 'message': str(exc)}},
status=500,
)
def _result_to_dict(result: BoxExecutionResult) -> dict:
return {
@@ -63,111 +49,98 @@ def _result_to_dict(result: BoxExecutionResult) -> dict:
}
async def handle_exec(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
body = await request.json()
session_id = request.match_info['session_id']
body['session_id'] = session_id
spec = BoxSpec.model_validate(body)
result = await runtime.execute(spec)
return web.json_response(_result_to_dict(result))
except pydantic.ValidationError as exc:
return web.json_response(
{'error': {'code': 'validation_error', 'message': str(exc)}},
status=400,
)
except BoxError as exc:
return _error_response(exc)
class BoxServerHandler(Handler):
"""Server-side handler that registers box actions backed by BoxRuntime."""
name = 'BoxServerHandler'
def __init__(self, connection: Connection, runtime: BoxRuntime):
super().__init__(connection)
self._runtime = runtime
self._register_actions()
def _register_actions(self) -> None:
@self.action(CommonAction.PING)
async def ping(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success({})
@self.action(LangBotToBoxAction.HEALTH)
async def health(data: dict[str, Any]) -> ActionResponse:
info = await self._runtime.get_backend_info()
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.STATUS)
async def status(data: dict[str, Any]) -> ActionResponse:
result = await self._runtime.get_status()
return ActionResponse.success(result)
@self.action(LangBotToBoxAction.EXEC)
async def exec_cmd(data: dict[str, Any]) -> ActionResponse:
try:
spec = BoxSpec.model_validate(data)
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
result = await self._runtime.execute(spec)
return ActionResponse.success(_result_to_dict(result))
@self.action(LangBotToBoxAction.CREATE_SESSION)
async def create_session(data: dict[str, Any]) -> ActionResponse:
try:
spec = BoxSpec.model_validate(data)
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
info = await self._runtime.create_session(spec)
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.GET_SESSION)
async def get_session(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success(self._runtime.get_session(data['session_id']))
@self.action(LangBotToBoxAction.GET_SESSIONS)
async def get_sessions(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success({'sessions': self._runtime.get_sessions()})
@self.action(LangBotToBoxAction.DELETE_SESSION)
async def delete_session(data: dict[str, Any]) -> ActionResponse:
await self._runtime.delete_session(data['session_id'])
return ActionResponse.success({'deleted': data['session_id']})
@self.action(LangBotToBoxAction.START_MANAGED_PROCESS)
async def start_managed_process(data: dict[str, Any]) -> ActionResponse:
session_id = data['session_id']
try:
spec = BoxManagedProcessSpec.model_validate(data['spec'])
except pydantic.ValidationError as exc:
return ActionResponse.error(f'BoxValidationError: {exc}')
info = await self._runtime.start_managed_process(session_id, spec)
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.GET_MANAGED_PROCESS)
async def get_managed_process(data: dict[str, Any]) -> ActionResponse:
return ActionResponse.success(
self._runtime.get_managed_process(data['session_id'])
)
@self.action(LangBotToBoxAction.GET_BACKEND_INFO)
async def get_backend_info(data: dict[str, Any]) -> ActionResponse:
info = await self._runtime.get_backend_info()
return ActionResponse.success(info)
@self.action(LangBotToBoxAction.SHUTDOWN)
async def shutdown(data: dict[str, Any]) -> ActionResponse:
await self._runtime.shutdown()
return ActionResponse.success({})
async def handle_create_session(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
body = await request.json()
session_id = request.match_info['session_id']
body['session_id'] = session_id
spec = BoxSpec.model_validate(body)
session_info = await runtime.create_session(spec)
return web.json_response(session_info, status=201)
except pydantic.ValidationError as exc:
return web.json_response(
{'error': {'code': 'validation_error', 'message': str(exc)}},
status=400,
)
except BoxError as exc:
return _error_response(exc)
# ── Managed process WebSocket relay (aiohttp) ────────────────────────
async def handle_get_sessions(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
return web.json_response(runtime.get_sessions())
except BoxError as exc:
return _error_response(exc)
async def handle_delete_session(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
try:
await runtime.delete_session(session_id)
return web.json_response({'deleted': session_id})
except BoxError as exc:
return _error_response(exc)
async def handle_get_session(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
try:
return web.json_response(runtime.get_session(session_id))
except BoxError as exc:
return _error_response(exc)
async def handle_status(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
status = await runtime.get_status()
return web.json_response(status)
except BoxError as exc:
return _error_response(exc)
async def handle_health(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
info = await runtime.get_backend_info()
return web.json_response(info)
except BoxError as exc:
return _error_response(exc)
async def handle_start_managed_process(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
try:
body = await request.json()
spec = BoxManagedProcessSpec.model_validate(body)
process_info = await runtime.start_managed_process(session_id, spec)
return web.json_response(process_info, status=201)
except pydantic.ValidationError as exc:
return web.json_response(
{'error': {'code': 'validation_error', 'message': str(exc)}},
status=400,
)
except BoxError as exc:
return _error_response(exc)
async def handle_get_managed_process(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
try:
return web.json_response(runtime.get_managed_process(session_id))
except BoxError as exc:
return _error_response(exc)
def _error_response(exc: Exception) -> web.Response:
return web.json_response(
{'error': {'code': type(exc).__name__, 'message': str(exc)}},
status=400,
)
async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse:
@@ -229,50 +202,67 @@ async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse:
return ws
def create_app(runtime: BoxRuntime | None = None) -> web.Application:
"""Create the aiohttp Application with all routes.
If *runtime* is ``None`` a new ``BoxRuntime`` is created using the module
logger.
"""
if runtime is None:
runtime = BoxRuntime(logger=logger)
def create_ws_relay_app(runtime: BoxRuntime) -> web.Application:
"""Create a minimal aiohttp app that only serves the managed-process ws relay."""
app = web.Application()
app['runtime'] = runtime
app.router.add_post('/v1/sessions/{session_id}/exec', handle_exec)
app.router.add_post('/v1/sessions/{session_id}', handle_create_session)
app.router.add_get('/v1/sessions/{session_id}', handle_get_session)
app.router.add_get('/v1/sessions', handle_get_sessions)
app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session)
app.router.add_post('/v1/sessions/{session_id}/managed-process', handle_start_managed_process)
app.router.add_get('/v1/sessions/{session_id}/managed-process', handle_get_managed_process)
app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws)
app.router.add_get('/v1/status', handle_status)
app.router.add_get('/v1/health', handle_health)
async def on_startup(_app: web.Application) -> None:
await _app['runtime'].initialize()
async def on_shutdown(_app: web.Application) -> None:
await _app['runtime'].shutdown()
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
return app
# ── Entry point ──────────────────────────────────────────────────────
async def _run_server(host: str, port: int, mode: str) -> None:
runtime = BoxRuntime(logger=logger)
await runtime.initialize()
# Start aiohttp for ws relay (non-fatal — managed process attach
# degrades gracefully if the port is unavailable).
runner: web.AppRunner | None = None
try:
ws_app = create_ws_relay_app(runtime)
runner = web.AppRunner(ws_app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
logger.info(f'Box ws relay listening on {host}:{port}')
except OSError as exc:
logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}')
logger.warning('Managed process WebSocket attach will be unavailable.')
async def new_connection_callback(connection: Connection) -> None:
handler = BoxServerHandler(connection, runtime)
await handler.run()
try:
if mode == 'stdio':
from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController
ctrl = StdioServerController()
await ctrl.run(new_connection_callback)
else:
from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController
# Action RPC uses port+1 to avoid conflict with ws relay
rpc_port = port + 1
logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}')
ctrl = WebSocketServerController(rpc_port)
await ctrl.run(new_connection_callback)
finally:
await runtime.shutdown()
if runner is not None:
await runner.cleanup()
def main() -> None:
parser = argparse.ArgumentParser(description='LangBot Box Runtime HTTP Service')
parser = argparse.ArgumentParser(description='LangBot Box Runtime Service')
parser.add_argument('--host', default='0.0.0.0', help='Bind address')
parser.add_argument('--port', type=int, default=5410, help='Bind port')
parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)')
parser.add_argument('--mode', choices=['stdio', 'ws'], default='stdio',
help='Control channel transport (default: stdio)')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
app = create_app()
web.run_app(app, host=args.host, port=args.port)
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
asyncio.run(_run_server(args.host, args.port, args.mode))
if __name__ == '__main__':

View File

@@ -147,7 +147,12 @@ class BoxService:
getter = getattr(self.client, 'get_managed_process_websocket_url', None)
if getter is None:
raise BoxValidationError('box runtime client does not support managed process websocket attach')
return getter(session_id)
ws_relay_base_url = (
self._runtime_connector.ws_relay_base_url
if self._runtime_connector is not None
else 'http://127.0.0.1:5410'
)
return getter(session_id, ws_relay_base_url)
def _serialize_result(self, result: BoxExecutionResult) -> dict:
stdout, stdout_truncated = self._truncate(result.stdout)

View File

@@ -17,6 +17,7 @@ from langbot_plugin.api.entities.builtin.pipeline.query import provider_session
from ..core import app
from . import handler
from ..utils import platform
from ..utils.managed_runtime import ManagedRuntimeConnector
from langbot_plugin.runtime.io.controllers.stdio import (
client as stdio_client_controller,
)
@@ -34,11 +35,9 @@ from ..core import taskmgr
from ..entity.persistence import plugin as persistence_plugin
class PluginRuntimeConnector:
class PluginRuntimeConnector(ManagedRuntimeConnector):
"""Plugin runtime connector"""
ap: app.Application
handler: handler.RuntimeConnectionHandler
handler_task: asyncio.Task
@@ -49,10 +48,6 @@ class PluginRuntimeConnector:
ctrl: stdio_client_controller.StdioClientController | ws_client_controller.WebSocketClientController
runtime_subprocess_on_windows: asyncio.subprocess.Process | None = None
runtime_subprocess_on_windows_task: asyncio.Task | None = None
runtime_disconnect_callback: typing.Callable[
[PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None]
]
@@ -67,7 +62,7 @@ class PluginRuntimeConnector:
[PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None]
],
):
self.ap = ap
super().__init__(ap)
self.runtime_disconnect_callback = runtime_disconnect_callback
self.is_enable_plugin = self.ap.instance_config.data.get('plugin', {}).get('enable', True)
@@ -135,19 +130,7 @@ class PluginRuntimeConnector:
# We have to launch runtime via cmd but communicate via ws.
self.ap.logger.info('(windows) use cmd to launch plugin runtime and communicate via ws')
if self.runtime_subprocess_on_windows is None: # only launch once
python_path = sys.executable
env = os.environ.copy()
self.runtime_subprocess_on_windows = await asyncio.create_subprocess_exec(
python_path,
'-m',
'langbot_plugin.cli.__init__',
'rt',
env=env,
)
# hold the process
self.runtime_subprocess_on_windows_task = asyncio.create_task(self.runtime_subprocess_on_windows.wait())
await self._start_runtime_subprocess('-m', 'langbot_plugin.cli.__init__', 'rt')
ws_url = 'ws://localhost:5400/control/ws'
@@ -523,13 +506,14 @@ class PluginRuntimeConnector:
return await self.handler.retrieve_knowledge(plugin_author, plugin_name, retriever_name, retrieval_context)
def dispose(self):
# No need to consider the shutdown on Windows
# for Windows can kill processes and subprocesses chainly
if self.is_enable_plugin and isinstance(self.ctrl, stdio_client_controller.StdioClientController):
# On non-Windows stdio mode, terminate via the controller's process handle.
# On Windows, the managed subprocess is cleaned up by the base class.
if self.is_enable_plugin and hasattr(self, 'ctrl') and isinstance(self.ctrl, stdio_client_controller.StdioClientController):
self.ap.logger.info('Terminating plugin runtime process...')
self.ctrl.process.terminate()
self._dispose_subprocess()
if self.heartbeat_task is not None:
self.heartbeat_task.cancel()
self.heartbeat_task = None

View File

@@ -328,11 +328,15 @@ class RuntimeMCPSession:
self.error_phase = None
await asyncio.sleep(delay)
_MONITOR_POLL_INTERVAL = 5
_MONITOR_MAX_CONSECUTIVE_ERRORS = 3
async def _monitor_box_process_health(self):
"""Poll managed process status; return when process exits."""
from ...box.models import BoxManagedProcessStatus
session_id = self._build_box_session_id()
consecutive_errors = 0
while not self._shutdown_event.is_set():
try:
info = await self.ap.box_service.client.get_managed_process(session_id)
@@ -341,10 +345,21 @@ class RuntimeMCPSession:
else:
status = getattr(info, 'status', '')
if status == BoxManagedProcessStatus.EXITED.value or status == BoxManagedProcessStatus.EXITED:
self.ap.logger.info(
f'MCP monitor for {self.server_name}: process exited'
)
return
except Exception:
return # Process or session gone
await asyncio.sleep(5)
consecutive_errors = 0
except Exception as exc:
consecutive_errors += 1
self.ap.logger.warning(
f'MCP monitor for {self.server_name}: get_managed_process failed '
f'({consecutive_errors}/{self._MONITOR_MAX_CONSECUTIVE_ERRORS}): '
f'{type(exc).__name__}: {exc}'
)
if consecutive_errors >= self._MONITOR_MAX_CONSECUTIVE_ERRORS:
return
await asyncio.sleep(self._MONITOR_POLL_INTERVAL)
async def start(self):
if not self.enable:
@@ -541,10 +556,18 @@ class RuntimeMCPSession:
because /workspace may be mounted read-only and pip needs to write
build artifacts in the source tree.
"""
# Use /opt instead of /tmp — /tmp is often a small tmpfs (64 MB)
# and cannot hold the copied source tree plus pip build artifacts.
_COPY_AND_INSTALL = (
'cp -r /workspace /tmp/_mcp_src'
' && pip install --no-cache-dir /tmp/_mcp_src'
' && rm -rf /tmp/_mcp_src'
'mkdir -p /opt/_mcp_src'
' && tar -C /workspace'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_mcp_src -xf -'
' && pip install --no-cache-dir /opt/_mcp_src'
' && rm -rf /opt/_mcp_src'
)
_INSTALL_REQUIREMENTS = 'pip install --no-cache-dir -r /workspace/requirements.txt'

View File

@@ -0,0 +1,89 @@
"""Base class for connectors that may manage a local runtime subprocess."""
from __future__ import annotations
import asyncio
import os
import sys
from typing import TYPE_CHECKING, Awaitable, Callable
if TYPE_CHECKING:
from ..core import app as core_app
class ManagedRuntimeConnector:
"""Base class for connectors that may manage a local runtime subprocess.
Provides shared lifecycle helpers: subprocess launch, health-check retry,
and graceful termination. Concrete connectors (plugin, box, …) inherit
this and add their own protocol-specific logic.
"""
ap: 'core_app.Application'
runtime_subprocess: asyncio.subprocess.Process | None
runtime_subprocess_task: asyncio.Task | None
def __init__(self, ap: 'core_app.Application'):
self.ap = ap
self.runtime_subprocess = None
self.runtime_subprocess_task = None
async def _start_runtime_subprocess(self, *args: str) -> None:
"""Launch a local runtime as a subprocess of the current Python interpreter.
If a subprocess is already running (no *returncode* yet), this is a no-op.
"""
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
return
python_path = sys.executable
env = os.environ.copy()
self.runtime_subprocess = await asyncio.create_subprocess_exec(
python_path,
*args,
env=env,
)
self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait())
async def _wait_until_ready(
self,
check: Callable[[], Awaitable[None]],
retries: int = 40,
interval: float = 0.25,
runtime_name: str = 'runtime',
) -> None:
"""Repeatedly call *check* until it succeeds or retries are exhausted.
Between attempts the method sleeps for *interval* seconds. If the
managed subprocess exits before readiness is confirmed, a
``RuntimeError`` is raised immediately.
"""
last_exc: Exception | None = None
for _ in range(retries):
# Fast-fail if the process already died.
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None:
raise RuntimeError(
f'local {runtime_name} exited before becoming ready '
f'(code {self.runtime_subprocess.returncode})'
)
try:
await check()
return
except Exception as exc:
last_exc = exc
await asyncio.sleep(interval)
if last_exc is not None:
raise last_exc
raise RuntimeError(f'local {runtime_name} did not become ready')
def _dispose_subprocess(self) -> None:
"""Terminate the managed subprocess and cancel its wait task."""
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
self.ap.logger.info('Terminating managed runtime process...')
self.runtime_subprocess.terminate()
if self.runtime_subprocess_task is not None:
self.runtime_subprocess_task.cancel()
self.runtime_subprocess_task = None