diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index d3ba8ad9..948f6161 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -4,6 +4,24 @@ version: "3" services: + langbot_box_runtime: + image: rockchin/langbot:latest + container_name: langbot_box_runtime + command: ["uv", "run", "--no-sync", "-m", "langbot.pkg.box.server"] + volumes: + # Mount the container runtime socket from the host. + # Uncomment the one that matches your container runtime: + # - /var/run/podman/podman.sock:/var/run/podman/podman.sock # Podman + - /var/run/docker.sock:/var/run/docker.sock # Docker + - ./data/box-workspaces:/workspaces + ports: + - 5410:5410 + restart: on-failure + environment: + - TZ=Asia/Shanghai + networks: + - langbot_network + langbot_plugin_runtime: image: rockchin/langbot:latest container_name: langbot_plugin_runtime @@ -23,9 +41,11 @@ services: container_name: langbot volumes: - ./data:/app/data + - ./data/box-workspaces:/workspaces restart: on-failure environment: - TZ=Asia/Shanghai + - BOX__RUNTIME_URL=http://langbot_box_runtime:5410 ports: - 5300:5300 # For web ui and webhook callback - 2280-2285:2280-2285 # For platform reverse connection diff --git a/src/langbot/pkg/api/http/controller/groups/box.py b/src/langbot/pkg/api/http/controller/groups/box.py index 13b9a139..d39ced93 100644 --- a/src/langbot/pkg/api/http/controller/groups/box.py +++ b/src/langbot/pkg/api/http/controller/groups/box.py @@ -13,7 +13,7 @@ class BoxRouterGroup(group.RouterGroup): @self.route('/sessions', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def _() -> str: - sessions = self.ap.box_service.runtime.get_sessions() + sessions = await self.ap.box_service.get_sessions() return self.success(data=sessions) @self.route('/errors', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) diff --git a/src/langbot/pkg/box/client.py b/src/langbot/pkg/box/client.py new file mode 100644 index 00000000..f13d67d5 --- /dev/null +++ b/src/langbot/pkg/box/client.py @@ -0,0 +1,232 @@ +"""BoxRuntimeClient abstraction for local and remote Box Runtime access.""" + +from __future__ import annotations + +import abc +import logging +from typing import TYPE_CHECKING + +import aiohttp + +from .errors import ( + BoxBackendUnavailableError, + BoxError, + BoxRuntimeUnavailableError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, +) +from .models import BoxExecutionResult, BoxExecutionStatus, BoxSpec +from .runtime import BoxRuntime +from ..utils import platform + +if TYPE_CHECKING: + from ..core import app as core_app + +_ERROR_CODE_MAP: dict[str, type[BoxError]] = { + 'validation_error': BoxValidationError, + 'session_not_found': BoxSessionNotFoundError, + 'session_conflict': BoxSessionConflictError, + 'backend_unavailable': BoxBackendUnavailableError, + 'runtime_unavailable': BoxRuntimeUnavailableError, + 'internal_error': BoxError, +} + + +def resolve_box_runtime_url(ap: 'core_app.Application') -> str: + box_config = getattr(ap, 'instance_config', None) + box_config_data = getattr(box_config, 'data', {}) if box_config is not None else {} + runtime_url = str(box_config_data.get('box', {}).get('runtime_url', '')).strip() + if runtime_url: + return runtime_url + + if platform.get_platform() == 'docker': + return 'http://langbot_box_runtime:5410' + return 'http://127.0.0.1:5410' + + +def create_box_runtime_client( + ap: 'core_app.Application', + runtime_url: str | None = None, +) -> 'RemoteBoxRuntimeClient': + return RemoteBoxRuntimeClient( + base_url=runtime_url or resolve_box_runtime_url(ap), + logger=ap.logger, + ) + + +class BoxRuntimeClient(abc.ABC): + """Abstract interface that BoxService uses to talk to a Box Runtime.""" + + @abc.abstractmethod + async def initialize(self) -> None: ... + + @abc.abstractmethod + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ... + + @abc.abstractmethod + async def shutdown(self) -> None: ... + + @abc.abstractmethod + async def get_status(self) -> dict: ... + + @abc.abstractmethod + async def get_sessions(self) -> list[dict]: ... + + @abc.abstractmethod + async def get_backend_info(self) -> dict: ... + + @abc.abstractmethod + async def delete_session(self, session_id: str) -> None: ... + + @abc.abstractmethod + async def create_session(self, spec: BoxSpec) -> dict: ... + + +class LocalBoxRuntimeClient(BoxRuntimeClient): + """In-process client that wraps a real BoxRuntime directly.""" + + def __init__(self, logger: logging.Logger, runtime: BoxRuntime | None = None): + self._runtime = runtime or BoxRuntime(logger=logger) + + @property + def runtime(self) -> BoxRuntime: + return self._runtime + + async def initialize(self) -> None: + await self._runtime.initialize() + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + return await self._runtime.execute(spec) + + async def shutdown(self) -> None: + await self._runtime.shutdown() + + async def get_status(self) -> dict: + return await self._runtime.get_status() + + async def get_sessions(self) -> list[dict]: + return self._runtime.get_sessions() + + async def get_backend_info(self) -> dict: + return await self._runtime.get_backend_info() + + async def delete_session(self, session_id: str) -> None: + await self._runtime.delete_session(session_id) + + async def create_session(self, spec: BoxSpec) -> dict: + return await self._runtime.create_session(spec) + + +class RemoteBoxRuntimeClient(BoxRuntimeClient): + """HTTP client that talks to a standalone Box Runtime service.""" + + def __init__(self, base_url: str, logger: logging.Logger): + self._base_url = base_url.rstrip('/') + self._logger = logger + self._session: aiohttp.ClientSession | None = None + + def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + async def _check_response(self, resp: aiohttp.ClientResponse) -> None: + if resp.status < 400: + return + try: + body = await resp.json() + error_info = body.get('error', {}) + code = error_info.get('code', '') + message = error_info.get('message', '') + except Exception: + resp.raise_for_status() + return + exc_class = _ERROR_CODE_MAP.get(code, BoxError) + raise exc_class(message) + + async def initialize(self) -> None: + session = self._get_session() + try: + async with session.get(f'{self._base_url}/v1/health') as resp: + await self._check_response(resp) + self._logger.info(f'LangBot Box runtime connected: {self._base_url}') + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + session = self._get_session() + payload = spec.model_dump(mode='json') + try: + async with session.post( + f'{self._base_url}/v1/sessions/{spec.session_id}/exec', + json=payload, + ) as resp: + await self._check_response(resp) + data = await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + return BoxExecutionResult( + session_id=data['session_id'], + backend_name=data['backend_name'], + status=BoxExecutionStatus(data['status']), + exit_code=data.get('exit_code'), + stdout=data.get('stdout', ''), + stderr=data.get('stderr', ''), + duration_ms=data['duration_ms'], + ) + + async def shutdown(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def get_status(self) -> dict: + session = self._get_session() + try: + async with session.get(f'{self._base_url}/v1/status') as resp: + await self._check_response(resp) + return await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def get_sessions(self) -> list[dict]: + session = self._get_session() + try: + async with session.get(f'{self._base_url}/v1/sessions') as resp: + await self._check_response(resp) + return await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def get_backend_info(self) -> dict: + session = self._get_session() + try: + async with session.get(f'{self._base_url}/v1/health') as resp: + await self._check_response(resp) + return await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def delete_session(self, session_id: str) -> None: + session = self._get_session() + try: + async with session.delete( + f'{self._base_url}/v1/sessions/{session_id}', + ) as resp: + await self._check_response(resp) + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def create_session(self, spec: BoxSpec) -> dict: + session = self._get_session() + payload = spec.model_dump(mode='json') + try: + async with session.post( + f'{self._base_url}/v1/sessions/{spec.session_id}', + json=payload, + ) as resp: + await self._check_response(resp) + return await resp.json() + except aiohttp.ClientError as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc diff --git a/src/langbot/pkg/box/connector.py b/src/langbot/pkg/box/connector.py new file mode 100644 index 00000000..f05299cd --- /dev/null +++ b/src/langbot/pkg/box/connector.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import asyncio +import os +import sys +from typing import TYPE_CHECKING + +from .errors import BoxRuntimeUnavailableError +from .client import create_box_runtime_client, resolve_box_runtime_url +from ..utils import platform + +if TYPE_CHECKING: + from ..core import app as core_app + + +class BoxRuntimeConnector: + """Build and initialize the Box runtime-facing service for the app.""" + + _HEALTH_CHECK_RETRY_COUNT = 40 + _HEALTH_CHECK_RETRY_INTERVAL_SEC = 0.25 + + def __init__(self, ap: 'core_app.Application'): + self.ap = ap + self.configured_runtime_url = self._load_configured_runtime_url() + self.runtime_url = self.configured_runtime_url or resolve_box_runtime_url(ap) + self.manages_local_runtime = self._should_manage_local_runtime() + self.client = create_box_runtime_client(ap, runtime_url=self.runtime_url) + self.runtime_subprocess: asyncio.subprocess.Process | None = None + self.runtime_subprocess_task: asyncio.Task | None = None + + async def initialize(self) -> None: + if not self.manages_local_runtime: + await self.client.initialize() + return + + try: + await self.client.initialize() + return + except BoxRuntimeUnavailableError: + self.ap.logger.info( + 'Local Box runtime is not running, starting an embedded Box runtime server...' + ) + + await self._start_local_runtime_process() + await self._wait_until_runtime_ready() + + def dispose(self) -> None: + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: + self.ap.logger.info('Terminating local Box runtime process...') + self.runtime_subprocess.terminate() + + if self.runtime_subprocess_task is not None: + self.runtime_subprocess_task.cancel() + self.runtime_subprocess_task = None + + def _load_configured_runtime_url(self) -> str: + box_config = getattr(self.ap, 'instance_config', None) + box_config_data = getattr(box_config, 'data', {}) if box_config is not None else {} + return str(box_config_data.get('box', {}).get('runtime_url', '')).strip() + + def _should_manage_local_runtime(self) -> bool: + return not self.configured_runtime_url and platform.get_platform() != 'docker' + + async def _start_local_runtime_process(self) -> None: + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None: + return + + python_path = sys.executable + env = os.environ.copy() + self.runtime_subprocess = await asyncio.create_subprocess_exec( + python_path, + '-m', + 'langbot.pkg.box.server', + env=env, + ) + self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait()) + + async def _wait_until_runtime_ready(self) -> None: + last_exc: BoxRuntimeUnavailableError | None = None + for _ in range(self._HEALTH_CHECK_RETRY_COUNT): + if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None: + raise BoxRuntimeUnavailableError( + f'local box runtime exited before becoming ready (code {self.runtime_subprocess.returncode})' + ) + + try: + await self.client.initialize() + self.ap.logger.info(f'Local Box runtime is ready at {self.runtime_url}.') + return + except BoxRuntimeUnavailableError as exc: + last_exc = exc + await asyncio.sleep(self._HEALTH_CHECK_RETRY_INTERVAL_SEC) + + if last_exc is not None: + raise last_exc + raise BoxRuntimeUnavailableError('local box runtime did not become ready') diff --git a/src/langbot/pkg/box/errors.py b/src/langbot/pkg/box/errors.py index 7790945d..8ef8d2ec 100644 --- a/src/langbot/pkg/box/errors.py +++ b/src/langbot/pkg/box/errors.py @@ -13,5 +13,13 @@ class BoxBackendUnavailableError(BoxError): """Raised when no supported container backend is available.""" +class BoxRuntimeUnavailableError(BoxError): + """Raised when the standalone Box Runtime service is unavailable.""" + + class BoxSessionConflictError(BoxError): """Raised when an existing session cannot satisfy a new request.""" + + +class BoxSessionNotFoundError(BoxError): + """Raised when a referenced session does not exist.""" diff --git a/src/langbot/pkg/box/runtime.py b/src/langbot/pkg/box/runtime.py index 10996727..39342f12 100644 --- a/src/langbot/pkg/box/runtime.py +++ b/src/langbot/pkg/box/runtime.py @@ -6,7 +6,7 @@ import datetime as dt import logging from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend -from .errors import BoxBackendUnavailableError, BoxSessionConflictError +from .errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec _UTC = dt.timezone.utc @@ -65,6 +65,16 @@ class BoxRuntime: for session_id in session_ids: await self._drop_session_locked(session_id) + async def create_session(self, spec: BoxSpec) -> dict: + session = await self._get_or_create_session(spec) + return self._session_to_dict(session.info) + + async def delete_session(self, session_id: str) -> None: + async with self._lock: + if session_id not in self._sessions: + raise BoxSessionNotFoundError(f'session {session_id} not found') + await self._drop_session_locked(session_id) + # ── Observability ───────────────────────────────────────────────── async def get_backend_info(self) -> dict: @@ -78,24 +88,7 @@ class BoxRuntime: return {'name': backend.name, 'available': available} def get_sessions(self) -> list[dict]: - return [ - { - 'session_id': s.info.session_id, - 'backend_name': s.info.backend_name, - 'backend_session_id': s.info.backend_session_id, - 'image': s.info.image, - 'network': s.info.network.value, - 'host_path': s.info.host_path, - 'host_path_mode': s.info.host_path_mode.value, - 'cpus': s.info.cpus, - 'memory_mb': s.info.memory_mb, - 'pids_limit': s.info.pids_limit, - 'read_only_rootfs': s.info.read_only_rootfs, - 'created_at': s.info.created_at.isoformat(), - 'last_used_at': s.info.last_used_at.isoformat(), - } - for s in self._sessions.values() - ] + return [self._session_to_dict(s.info) for s in self._sessions.values()] async def get_status(self) -> dict: backend_info = await self.get_backend_info() @@ -222,3 +215,21 @@ class BoxRuntime: raise BoxSessionConflictError( f'sandbox_exec session {spec.session_id} already exists with read_only_rootfs={session.read_only_rootfs}' ) + + @staticmethod + def _session_to_dict(info: BoxSessionInfo) -> dict: + return { + 'session_id': info.session_id, + 'backend_name': info.backend_name, + 'backend_session_id': info.backend_session_id, + 'image': info.image, + 'network': info.network.value, + 'host_path': info.host_path, + 'host_path_mode': info.host_path_mode.value, + 'cpus': info.cpus, + 'memory_mb': info.memory_mb, + 'pids_limit': info.pids_limit, + 'read_only_rootfs': info.read_only_rootfs, + 'created_at': info.created_at.isoformat(), + 'last_used_at': info.last_used_at.isoformat(), + } diff --git a/src/langbot/pkg/box/server.py b/src/langbot/pkg/box/server.py new file mode 100644 index 00000000..67b78cec --- /dev/null +++ b/src/langbot/pkg/box/server.py @@ -0,0 +1,176 @@ +"""Standalone HTTP service exposing BoxRuntime as a REST API. + +Usage: + python -m langbot.pkg.box.server [--host 0.0.0.0] [--port 5410] +""" + +from __future__ import annotations + +import argparse +import logging + +import pydantic +from aiohttp import web + +from .errors import ( + BoxBackendUnavailableError, + BoxError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, +) +from .models import BoxExecutionResult, BoxSpec +from .runtime import BoxRuntime + +logger = logging.getLogger('langbot.box.server') + +_ERROR_MAP: dict[type, tuple[int, str]] = { + BoxValidationError: (400, 'validation_error'), + BoxSessionNotFoundError: (404, 'session_not_found'), + BoxSessionConflictError: (409, 'session_conflict'), + BoxBackendUnavailableError: (503, 'backend_unavailable'), +} + + +def _error_response(exc: Exception) -> web.Response: + for exc_type, (status, code) in _ERROR_MAP.items(): + if isinstance(exc, exc_type): + return web.json_response( + {'error': {'code': code, 'message': str(exc)}}, + status=status, + ) + return web.json_response( + {'error': {'code': 'internal_error', 'message': str(exc)}}, + status=500, + ) + + +def _result_to_dict(result: BoxExecutionResult) -> dict: + return { + 'session_id': result.session_id, + 'backend_name': result.backend_name, + 'status': result.status.value, + 'exit_code': result.exit_code, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'duration_ms': result.duration_ms, + } + + +async def handle_exec(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + try: + body = await request.json() + session_id = request.match_info['session_id'] + body['session_id'] = session_id + spec = BoxSpec.model_validate(body) + result = await runtime.execute(spec) + return web.json_response(_result_to_dict(result)) + except pydantic.ValidationError as exc: + return web.json_response( + {'error': {'code': 'validation_error', 'message': str(exc)}}, + status=400, + ) + except BoxError as exc: + return _error_response(exc) + + +async def handle_create_session(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + try: + body = await request.json() + session_id = request.match_info['session_id'] + body['session_id'] = session_id + body.setdefault('cmd', '__langbot_session_placeholder__') + spec = BoxSpec.model_validate(body) + session_info = await runtime.create_session(spec) + return web.json_response(session_info, status=201) + except pydantic.ValidationError as exc: + return web.json_response( + {'error': {'code': 'validation_error', 'message': str(exc)}}, + status=400, + ) + except BoxError as exc: + return _error_response(exc) + + +async def handle_get_sessions(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + try: + return web.json_response(runtime.get_sessions()) + except BoxError as exc: + return _error_response(exc) + + +async def handle_delete_session(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + try: + await runtime.delete_session(session_id) + return web.json_response({'deleted': session_id}) + except BoxError as exc: + return _error_response(exc) + + +async def handle_status(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + try: + status = await runtime.get_status() + return web.json_response(status) + except BoxError as exc: + return _error_response(exc) + + +async def handle_health(request: web.Request) -> web.Response: + runtime: BoxRuntime = request.app['runtime'] + try: + info = await runtime.get_backend_info() + return web.json_response(info) + except BoxError as exc: + return _error_response(exc) + + +def create_app(runtime: BoxRuntime | None = None) -> web.Application: + """Create the aiohttp Application with all routes. + + If *runtime* is ``None`` a new ``BoxRuntime`` is created using the module + logger. + """ + if runtime is None: + runtime = BoxRuntime(logger=logger) + + app = web.Application() + app['runtime'] = runtime + + app.router.add_post('/v1/sessions/{session_id}/exec', handle_exec) + app.router.add_post('/v1/sessions/{session_id}', handle_create_session) + app.router.add_get('/v1/sessions', handle_get_sessions) + app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session) + app.router.add_get('/v1/status', handle_status) + app.router.add_get('/v1/health', handle_health) + + async def on_startup(_app: web.Application) -> None: + await _app['runtime'].initialize() + + async def on_shutdown(_app: web.Application) -> None: + await _app['runtime'].shutdown() + + app.on_startup.append(on_startup) + app.on_shutdown.append(on_shutdown) + + return app + + +def main() -> None: + parser = argparse.ArgumentParser(description='LangBot Box Runtime HTTP Service') + parser.add_argument('--host', default='0.0.0.0', help='Bind address') + parser.add_argument('--port', type=int, default=5410, help='Bind port') + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + app = create_app() + web.run_app(app, host=args.host, port=args.port) + + +if __name__ == '__main__': + main() diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 3a486165..b7dc412c 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -9,9 +9,10 @@ from typing import TYPE_CHECKING import pydantic +from .client import BoxRuntimeClient +from .connector import BoxRuntimeConnector from .errors import BoxError, BoxValidationError from .models import BUILTIN_PROFILES, BoxExecutionResult, BoxProfile, BoxSpec -from .runtime import BoxRuntime _INT_ADAPTER = pydantic.TypeAdapter(int) _UTC = _dt.timezone.utc @@ -26,19 +27,28 @@ class BoxService: def __init__( self, ap: 'core_app.Application', - runtime: BoxRuntime | None = None, + client: BoxRuntimeClient | None = None, output_limit_chars: int = 4000, ): self.ap = ap - self.runtime = runtime or BoxRuntime(logger=ap.logger) + self._runtime_connector: BoxRuntimeConnector | None = None + if client is None: + self._runtime_connector = BoxRuntimeConnector(ap) + client = self._runtime_connector.client + self.client = client self.output_limit_chars = output_limit_chars self.allowed_host_mount_roots = self._load_allowed_host_mount_roots() self.default_host_workspace = self._load_default_host_workspace() self.profile = self._load_profile() self._recent_errors: collections.deque[dict] = collections.deque(maxlen=_MAX_RECENT_ERRORS) + self._shutdown_task = None async def initialize(self): - await self.runtime.initialize() + self._ensure_default_host_workspace() + if self._runtime_connector is not None: + await self._runtime_connector.initialize() + return + await self.client.initialize() async def execute_sandbox_tool(self, parameters: dict, query: 'pipeline_query.Query') -> dict: spec_payload = dict(parameters) @@ -64,7 +74,7 @@ class BoxService: f'spec={json.dumps(self._summarize_spec(spec), ensure_ascii=False)}' ) try: - result = await self.runtime.execute(spec) + result = await self.client.execute(spec) except BoxError as exc: self._record_error(exc, query) raise @@ -76,7 +86,21 @@ class BoxService: return self._serialize_result(result) async def shutdown(self): - await self.runtime.shutdown() + await self.client.shutdown() + + def dispose(self): + if self._runtime_connector is not None: + self._runtime_connector.dispose() + loop = getattr(self.ap, 'event_loop', None) + if ( + loop is not None + and not loop.is_closed() + and (self._shutdown_task is None or self._shutdown_task.done()) + ): + self._shutdown_task = loop.create_task(self.shutdown()) + + async def get_sessions(self) -> list[dict]: + return await self.client.get_sessions() def _serialize_result(self, result: BoxExecutionResult) -> dict: stdout, stdout_truncated = self._truncate(result.stdout) @@ -186,6 +210,29 @@ class BoxService: return None return os.path.realpath(os.path.abspath(default_host_workspace)) + def _ensure_default_host_workspace(self): + if self.default_host_workspace is None: + return + + if os.path.isdir(self.default_host_workspace): + return + + if os.path.exists(self.default_host_workspace): + raise BoxValidationError('default_host_workspace must point to a directory on the host') + + if not self.allowed_host_mount_roots: + raise BoxValidationError( + 'default_host_workspace cannot be created because no allowed_host_mount_roots are configured' + ) + + for allowed_root in self.allowed_host_mount_roots: + if self.default_host_workspace == allowed_root or self.default_host_workspace.startswith(f'{allowed_root}{os.sep}'): + os.makedirs(self.default_host_workspace, exist_ok=True) + return + + allowed_roots = ', '.join(self.allowed_host_mount_roots) + raise BoxValidationError(f'default_host_workspace is outside allowed_host_mount_roots: {allowed_roots}') + def _validate_host_mount(self, spec: BoxSpec): if spec.host_path is None: return @@ -255,7 +302,7 @@ class BoxService: return list(self._recent_errors) async def get_status(self) -> dict: - runtime_status = await self.runtime.get_status() + runtime_status = await self.client.get_status() return { **runtime_status, 'profile': self.profile.name, diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index dbde2a46..f40ecd9e 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -235,7 +235,10 @@ class Application: self.logger.debug(f'Traceback: {traceback.format_exc()}') def dispose(self): - self.plugin_connector.dispose() + if self.plugin_connector is not None: + self.plugin_connector.dispose() + if self.box_service is not None: + self.box_service.dispose() async def print_web_access_info(self): """Print access webui tips""" diff --git a/src/langbot/pkg/core/boot.py b/src/langbot/pkg/core/boot.py index 11a2d5e2..e1aaa652 100644 --- a/src/langbot/pkg/core/boot.py +++ b/src/langbot/pkg/core/boot.py @@ -46,6 +46,7 @@ async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application: async def main(loop: asyncio.AbstractEventLoop): + app_inst: app.Application | None = None try: # Hang system signal processing import signal @@ -60,4 +61,6 @@ async def main(loop: asyncio.AbstractEventLoop): app_inst = await make_app(loop) await app_inst.run() except Exception: + if app_inst is not None: + app_inst.dispose() traceback.print_exc() diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 36f050d7..b4a58db3 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -6,9 +6,9 @@ from .. import stage, app from ...utils import version, proxy from ...pipeline import pool, controller, pipelinemgr from ...pipeline import aggregator as message_aggregator +from ...box import service as box_service from ...plugin import connector as plugin_connector from ...command import cmdmgr -from ...box import service as box_service from ...provider.session import sessionmgr as llm_session_mgr from ...provider.modelmgr import modelmgr as llm_model_mgr from ...provider.tools import toolmgr as llm_tool_mgr diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 87f8d8ce..db05b0d3 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -17,12 +17,19 @@ from ....provider import runners import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.provider.message as provider_message +from .. import logging_utils importutil.import_modules_in_pkg(runners) class ChatMessageHandler(handler.MessageHandler): + def _format_result_log( + self, + result: provider_message.Message | provider_message.MessageChunk, + ) -> str | None: + return logging_utils.format_result_log(result, self.cut_str) + async def handle( self, query: pipeline_query.Query, @@ -113,9 +120,11 @@ class ChatMessageHandler(handler.MessageHandler): # This prevents memory overflow from thousands of log entries per conversation # First chunk uses INFO level to confirm connection establishment if chunk_count == 1: - self.ap.logger.info( - f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}' - ) + summary = self._format_result_log(result) + if summary is not None: + self.ap.logger.info(f'Conversation({query.query_id}) Streaming started: {summary}') + else: + self.ap.logger.info(f'Conversation({query.query_id}) Streaming started') elif chunk_count % 10 == 0: self.ap.logger.debug( f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}' @@ -135,9 +144,9 @@ class ChatMessageHandler(handler.MessageHandler): async for result in runner.run(query): query.resp_messages.append(result) - self.ap.logger.info( - f'Conversation({query.query_id}) Response: {self.cut_str(result.readable_str())}' - ) + summary = self._format_result_log(result) + if summary is not None: + self.ap.logger.info(f'Conversation({query.query_id}) Response: {summary}') if result.content is not None: text_length += len(result.content) diff --git a/src/langbot/pkg/pipeline/process/logging_utils.py b/src/langbot/pkg/pipeline/process/logging_utils.py new file mode 100644 index 00000000..78a289e8 --- /dev/null +++ b/src/langbot/pkg/pipeline/process/logging_utils.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import json +import typing + +import langbot_plugin.api.entities.builtin.provider.message as provider_message + + +def format_result_log( + result: provider_message.Message | provider_message.MessageChunk, + cut_str: typing.Callable[[str], str], +) -> str | None: + if result.tool_calls: + tool_names = [tc.function.name for tc in result.tool_calls if tc.function and tc.function.name] + if tool_names: + return f'{result.role}: requested tools: {", ".join(tool_names)}' + return f'{result.role}: requested tool calls' + + content = result.content + if isinstance(content, str): + if not content.strip(): + return None + + if result.role == 'tool': + if content.startswith('err:'): + return f'tool error: {cut_str(content)}' + + try: + payload = json.loads(content) + except json.JSONDecodeError: + return cut_str(result.readable_str()) + + if isinstance(payload, dict): + status = payload.get('status', 'unknown') + exit_code = payload.get('exit_code') + backend = payload.get('backend', '') + stdout = str(payload.get('stdout', '')).strip() + summary = f'tool result: status={status}' + if exit_code is not None: + summary += f' exit_code={exit_code}' + if backend: + summary += f' backend={backend}' + if stdout: + summary += f' stdout={cut_str(stdout)}' + return summary + + return cut_str(result.readable_str()) + + if isinstance(content, list) and len(content) == 0: + return None + + return cut_str(result.readable_str()) diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 03b28a18..fe9e1d3a 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -410,7 +410,15 @@ class LocalAgentRunner(runner.RequestRunner): req_messages.append(msg) except Exception as e: - err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id) + if is_stream: + err_msg = provider_message.MessageChunk( + role='tool', + content=f'err: {e}', + tool_call_id=tool_call.id, + is_final=True, + ) + else: + err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id) yield err_msg diff --git a/src/langbot/templates/config.yaml b/src/langbot/templates/config.yaml index efee6d3c..1213eec6 100644 --- a/src/langbot/templates/config.yaml +++ b/src/langbot/templates/config.yaml @@ -89,8 +89,9 @@ monitoring: check_interval_hours: 1 box: profile: 'default' - default_host_workspace: './data/box-workspaces/default' - allowed_host_mount_roots: + runtime_url: '' # Leave empty to use defaults: http://127.0.0.1:5410 locally, http://langbot_box_runtime:5410 in Docker + default_host_workspace: './data/box-workspaces/default' # For Docker deployment, use '/workspaces/default' + allowed_host_mount_roots: # For Docker deployment, use '/workspaces' instead - './data/box-workspaces' - '/tmp' space: diff --git a/tests/unit_tests/box/test_box_connector.py b/tests/unit_tests/box/test_box_connector.py new file mode 100644 index 00000000..8b741bed --- /dev/null +++ b/tests/unit_tests/box/test_box_connector.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +from langbot.pkg.box.client import RemoteBoxRuntimeClient +from langbot.pkg.box.connector import BoxRuntimeConnector +from langbot.pkg.box.errors import BoxRuntimeUnavailableError + + +def make_app(logger: Mock, runtime_url: str = ''): + return SimpleNamespace( + logger=logger, + instance_config=SimpleNamespace( + data={ + 'box': { + 'runtime_url': runtime_url, + 'profile': 'default', + 'allowed_host_mount_roots': [], + 'default_host_workspace': '', + } + } + ), + ) + + +def patch_platform(monkeypatch: pytest.MonkeyPatch, value: str): + monkeypatch.setattr('langbot.pkg.box.client.platform.get_platform', lambda: value) + monkeypatch.setattr('langbot.pkg.box.connector.platform.get_platform', lambda: value) + + +def test_box_runtime_connector_uses_explicit_runtime_url(): + logger = Mock() + connector = BoxRuntimeConnector(make_app(logger, runtime_url='http://box-runtime:5410')) + + assert connector.runtime_url == 'http://box-runtime:5410' + assert connector.manages_local_runtime is False + assert isinstance(connector.client, RemoteBoxRuntimeClient) + assert connector.client._base_url == 'http://box-runtime:5410' + + +def test_box_runtime_connector_uses_local_default_runtime_url(monkeypatch: pytest.MonkeyPatch): + patch_platform(monkeypatch, 'linux') + + connector = BoxRuntimeConnector(make_app(Mock())) + + assert connector.runtime_url == 'http://127.0.0.1:5410' + assert connector.manages_local_runtime is True + assert connector.client._base_url == 'http://127.0.0.1:5410' + + +def test_box_runtime_connector_uses_docker_default_runtime_url(monkeypatch: pytest.MonkeyPatch): + patch_platform(monkeypatch, 'docker') + + connector = BoxRuntimeConnector(make_app(Mock())) + + assert connector.runtime_url == 'http://langbot_box_runtime:5410' + assert connector.manages_local_runtime is False + assert connector.client._base_url == 'http://langbot_box_runtime:5410' + + +@pytest.mark.asyncio +async def test_box_runtime_connector_initialize_delegates_to_client_when_runtime_is_healthy( + monkeypatch: pytest.MonkeyPatch, +): + patch_platform(monkeypatch, 'linux') + connector = BoxRuntimeConnector(make_app(Mock())) + connector.client.initialize = AsyncMock() + connector._start_local_runtime_process = AsyncMock() + connector._wait_until_runtime_ready = AsyncMock() + + await connector.initialize() + + connector.client.initialize.assert_awaited_once() + connector._start_local_runtime_process.assert_not_awaited() + connector._wait_until_runtime_ready.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_box_runtime_connector_initialize_autostarts_local_runtime_when_unavailable( + monkeypatch: pytest.MonkeyPatch, +): + patch_platform(monkeypatch, 'linux') + connector = BoxRuntimeConnector(make_app(Mock())) + connector.client.initialize = AsyncMock(side_effect=BoxRuntimeUnavailableError('down')) + connector._start_local_runtime_process = AsyncMock() + connector._wait_until_runtime_ready = AsyncMock() + + await connector.initialize() + + connector.client.initialize.assert_awaited_once() + connector._start_local_runtime_process.assert_awaited_once() + connector._wait_until_runtime_ready.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_box_runtime_connector_initialize_remote_runtime_does_not_autostart(): + connector = BoxRuntimeConnector(make_app(Mock(), runtime_url='http://box-runtime:5410')) + connector.client.initialize = AsyncMock() + connector._start_local_runtime_process = AsyncMock() + connector._wait_until_runtime_ready = AsyncMock() + + await connector.initialize() + + connector.client.initialize.assert_awaited_once() + connector._start_local_runtime_process.assert_not_awaited() + connector._wait_until_runtime_ready.assert_not_awaited() + + +def test_box_runtime_connector_dispose_terminates_local_runtime_process(): + logger = Mock() + connector = BoxRuntimeConnector(make_app(logger)) + runtime_process = Mock() + runtime_process.returncode = None + runtime_task = Mock() + connector.runtime_subprocess = runtime_process + connector.runtime_subprocess_task = runtime_task + + connector.dispose() + + runtime_process.terminate.assert_called_once() + runtime_task.cancel.assert_called_once() + assert connector.runtime_subprocess_task is None diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index d8ccf815..bc43f345 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -1,16 +1,19 @@ from __future__ import annotations +import asyncio import datetime as dt import os +import socket from types import SimpleNamespace -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock import pytest import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query from langbot.pkg.box.backend import BaseSandboxBackend -from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxValidationError +from langbot.pkg.box.client import LocalBoxRuntimeClient, RemoteBoxRuntimeClient +from langbot.pkg.box.errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError, BoxValidationError from langbot.pkg.box.models import ( BUILTIN_PROFILES, BoxExecutionResult, @@ -27,6 +30,21 @@ from langbot.pkg.box.service import BoxService _UTC = dt.timezone.utc +def _can_open_test_socket() -> bool: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except OSError: + return False + sock.close() + return True + + +requires_socket = pytest.mark.skipif( + not _can_open_test_socket(), + reason='local test environment does not permit opening TCP sockets', +) + + class FakeBackend(BaseSandboxBackend): def __init__(self, logger: Mock, available: bool = True): super().__init__(logger) @@ -95,6 +113,68 @@ def make_app(logger: Mock, allowed_host_mount_roots: list[str] | None = None, pr ) +@pytest.mark.asyncio +async def test_box_service_without_explicit_client_initializes_internal_connector(monkeypatch: pytest.MonkeyPatch): + connector = Mock() + connector.client = Mock() + connector.initialize = AsyncMock() + + monkeypatch.setattr('langbot.pkg.box.service.BoxRuntimeConnector', Mock(return_value=connector)) + + service = BoxService(make_app(Mock())) + await service.initialize() + + assert service.client is connector.client + connector.initialize.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_box_service_get_sessions_delegates_to_client(): + client = Mock() + client.get_sessions = AsyncMock(return_value=[{'session_id': 'test-session'}]) + + service = BoxService(make_app(Mock()), client=client) + + sessions = await service.get_sessions() + + assert sessions == [{'session_id': 'test-session'}] + client.get_sessions.assert_awaited_once() + + +def test_box_service_dispose_delegates_to_internal_connector(monkeypatch: pytest.MonkeyPatch): + connector = Mock() + connector.client = Mock() + + monkeypatch.setattr('langbot.pkg.box.service.BoxRuntimeConnector', Mock(return_value=connector)) + + service = BoxService(make_app(Mock())) + service.dispose() + + connector.dispose.assert_called_once() + + +@pytest.mark.asyncio +async def test_box_service_dispose_schedules_shutdown_on_event_loop(monkeypatch: pytest.MonkeyPatch): + connector = Mock() + connector.client = Mock() + connector.dispose = Mock() + + monkeypatch.setattr('langbot.pkg.box.service.BoxRuntimeConnector', Mock(return_value=connector)) + + app = make_app(Mock()) + loop = asyncio.get_running_loop() + app.event_loop = loop + + service = BoxService(app) + service.shutdown = AsyncMock() + + service.dispose() + await asyncio.sleep(0) + + connector.dispose.assert_called_once() + service.shutdown.assert_awaited_once() + + @pytest.mark.asyncio async def test_box_runtime_reuses_request_session(): logger = Mock() @@ -117,7 +197,7 @@ async def test_box_service_defaults_session_id_from_query(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'pwd', 'network': BoxNetworkMode.OFF.value}, make_query(7)) @@ -132,7 +212,7 @@ async def test_box_service_fails_closed_when_backend_unavailable(): logger = Mock() backend = FakeBackend(logger, available=False) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() with pytest.raises(BoxBackendUnavailableError): @@ -146,7 +226,7 @@ async def test_box_service_allows_host_mount_under_configured_root(tmp_path): runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) host_dir = tmp_path / 'mounted-workspace' host_dir.mkdir() - service = BoxService(make_app(logger, [str(tmp_path)]), runtime=runtime) + service = BoxService(make_app(logger, [str(tmp_path)]), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool( @@ -171,7 +251,7 @@ async def test_box_service_uses_default_host_workspace_when_host_path_omitted(tm host_dir.mkdir() app = make_app(logger, [str(tmp_path)]) app.instance_config.data['box']['default_host_workspace'] = str(host_dir) - service = BoxService(app, runtime=runtime) + service = BoxService(app, client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'pwd'}, make_query(15)) @@ -182,6 +262,23 @@ async def test_box_service_uses_default_host_workspace_when_host_path_omitted(tm assert backend.start_specs[0].host_path == os.path.realpath(host_dir) +@pytest.mark.asyncio +async def test_box_service_creates_default_host_workspace_on_initialize(tmp_path): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + allowed_root = tmp_path / 'allowed-root' + allowed_root.mkdir() + default_host_workspace = allowed_root / 'default-workspace' + app = make_app(logger, [str(allowed_root)]) + app.instance_config.data['box']['default_host_workspace'] = str(default_host_workspace) + service = BoxService(app, client=LocalBoxRuntimeClient(logger, runtime)) + + await service.initialize() + + assert default_host_workspace.is_dir() + + @pytest.mark.asyncio async def test_box_service_rejects_host_mount_outside_allowed_roots(tmp_path): logger = Mock() @@ -191,7 +288,7 @@ async def test_box_service_rejects_host_mount_outside_allowed_roots(tmp_path): disallowed_root = tmp_path / 'disallowed' allowed_root.mkdir() disallowed_root.mkdir() - service = BoxService(make_app(logger, [str(allowed_root)]), runtime=runtime) + service = BoxService(make_app(logger, [str(allowed_root)]), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() with pytest.raises(BoxValidationError): @@ -282,7 +379,7 @@ async def test_truncate_short_output_unchanged(): logger = Mock() backend = FakeBackendWithOutput(logger, stdout='hello world') runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime, output_limit_chars=100) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime), output_limit_chars=100) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'echo hello'}, make_query(20)) @@ -303,7 +400,7 @@ async def test_truncate_preserves_head_and_tail(): backend = FakeBackendWithOutput(logger, stdout=big_output) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) limit = 100 - service = BoxService(make_app(logger), runtime=runtime, output_limit_chars=limit) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime), output_limit_chars=limit) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'cat big'}, make_query(21)) @@ -325,7 +422,7 @@ async def test_truncate_at_exact_limit_not_truncated(): exact_output = 'a' * 200 backend = FakeBackendWithOutput(logger, stdout=exact_output) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime, output_limit_chars=200) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime), output_limit_chars=200) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'echo a'}, make_query(22)) @@ -339,7 +436,7 @@ async def test_truncate_stderr_independently(): logger = Mock() backend = FakeBackendWithOutput(logger, stdout='short', stderr='E' * 300) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime, output_limit_chars=100) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime), output_limit_chars=100) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'fail'}, make_query(23)) @@ -359,7 +456,7 @@ async def test_profile_default_provides_defaults(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool({'cmd': 'echo hi'}, make_query(30)) @@ -377,7 +474,7 @@ async def test_profile_unlocked_field_can_be_overridden(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool( @@ -397,7 +494,7 @@ async def test_profile_locked_field_cannot_be_overridden(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger, profile='offline_readonly'), runtime=runtime) + service = BoxService(make_app(logger, profile='offline_readonly'), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool( @@ -417,7 +514,7 @@ async def test_profile_timeout_clamped_to_max(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() result = await service.execute_sandbox_tool( @@ -437,7 +534,7 @@ async def test_profile_timeout_clamped_for_coercible_inputs(timeout_value): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() await service.execute_sandbox_tool( @@ -452,8 +549,9 @@ async def test_profile_timeout_clamped_for_coercible_inputs(timeout_value): def test_unknown_profile_raises_error(): """Config referencing a non-existent profile name raises immediately.""" logger = Mock() + runtime = BoxRuntime(logger=logger, backends=[FakeBackend(logger)], session_ttl_sec=300) with pytest.raises(BoxValidationError, match='unknown box profile'): - BoxService(make_app(logger, profile='nonexistent')) + BoxService(make_app(logger, profile='nonexistent'), client=LocalBoxRuntimeClient(logger, runtime)) def test_builtin_profiles_are_consistent(): @@ -488,7 +586,7 @@ async def test_profile_default_applies_resource_limits(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() await service.execute_sandbox_tool({'cmd': 'echo hi'}, make_query(40)) @@ -507,7 +605,7 @@ async def test_profile_offline_readonly_locks_read_only_rootfs(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger, profile='offline_readonly'), runtime=runtime) + service = BoxService(make_app(logger, profile='offline_readonly'), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() await service.execute_sandbox_tool( @@ -525,7 +623,7 @@ async def test_profile_network_extended_has_relaxed_limits(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger, profile='network_extended'), runtime=runtime) + service = BoxService(make_app(logger, profile='network_extended'), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() await service.execute_sandbox_tool({'cmd': 'echo hi'}, make_query(42)) @@ -600,7 +698,7 @@ async def test_service_records_errors_on_failure(): logger = Mock() backend = FakeBackend(logger, available=False) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() with pytest.raises(Exception): @@ -618,7 +716,7 @@ async def test_service_error_ring_buffer_capped(): logger = Mock() backend = FakeBackend(logger, available=False) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() for i in range(60): @@ -637,7 +735,7 @@ async def test_service_get_status_aggregates_runtime_and_profile(): logger = Mock() backend = FakeBackend(logger) runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) - service = BoxService(make_app(logger), runtime=runtime) + service = BoxService(make_app(logger), client=LocalBoxRuntimeClient(logger, runtime)) await service.initialize() status = await service.get_status() @@ -646,3 +744,419 @@ async def test_service_get_status_aggregates_runtime_and_profile(): assert status['backend']['available'] is True assert status['active_sessions'] == 0 assert status['recent_error_count'] == 0 + + +# ── RemoteBoxRuntimeClient tests ───────────────────────────────────── + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_execute(): + """RemoteBoxRuntimeClient correctly posts to server and parses result.""" + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + await client.initialize() + + spec = BoxSpec.model_validate({'cmd': 'echo remote', 'session_id': 'r-1'}) + result = await client.execute(spec) + + assert result.session_id == 'r-1' + assert result.status == BoxExecutionStatus.COMPLETED + assert result.exit_code == 0 + assert result.stdout == 'executed: echo remote' + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_get_sessions(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + + spec = BoxSpec.model_validate({'cmd': 'echo hi', 'session_id': 'r-2'}) + await client.execute(spec) + + sessions = await client.get_sessions() + assert len(sessions) == 1 + assert sessions[0]['session_id'] == 'r-2' + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_get_status(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + status = await client.get_status() + + assert 'backend' in status + assert 'active_sessions' in status + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_get_backend_info(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + info = await client.get_backend_info() + + assert info['name'] == 'fake' + assert info['available'] is True + await client.shutdown() + finally: + await server.close() + + +# ── Server endpoint tests ──────────────────────────────────────────── + + +@requires_socket +@pytest.mark.asyncio +async def test_server_delete_session(): + from aiohttp.test_utils import TestClient, TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + test_client = TestClient(server) + await test_client.start_server() + try: + # Create a session via exec + resp = await test_client.post('/v1/sessions/del-1/exec', json={'cmd': 'echo hi'}) + assert resp.status == 200 + + # Delete it + resp = await test_client.delete('/v1/sessions/del-1') + assert resp.status == 200 + data = await resp.json() + assert data['deleted'] == 'del-1' + + # Verify session is gone + resp = await test_client.get('/v1/sessions') + sessions = await resp.json() + assert len(sessions) == 0 + finally: + await test_client.close() + + +# ── Runtime delete_session / create_session tests ──────────────────── + + +@pytest.mark.asyncio +async def test_runtime_delete_session(): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + await runtime.initialize() + + await runtime.execute(BoxSpec.model_validate({'cmd': 'echo', 'session_id': 'del-test'})) + assert len(runtime.get_sessions()) == 1 + + await runtime.delete_session('del-test') + assert len(runtime.get_sessions()) == 0 + assert backend.stop_calls == ['del-test'] + + +@pytest.mark.asyncio +async def test_runtime_delete_session_not_found(): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + await runtime.initialize() + + with pytest.raises(BoxSessionNotFoundError): + await runtime.delete_session('nonexistent') + + +@pytest.mark.asyncio +async def test_runtime_create_session(): + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + await runtime.initialize() + + spec = BoxSpec.model_validate({'cmd': 'placeholder', 'session_id': 'create-1'}) + info = await runtime.create_session(spec) + assert info['session_id'] == 'create-1' + assert info['backend_name'] == 'fake' + + sessions = runtime.get_sessions() + assert len(sessions) == 1 + assert sessions[0]['session_id'] == 'create-1' + + +# ── Server structured error tests ──────────────────────────────────── + + +@requires_socket +@pytest.mark.asyncio +async def test_server_delete_nonexistent_session(): + from aiohttp.test_utils import TestClient, TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + test_client = TestClient(server) + await test_client.start_server() + try: + resp = await test_client.delete('/v1/sessions/nonexistent') + assert resp.status == 404 + data = await resp.json() + assert data['error']['code'] == 'session_not_found' + finally: + await test_client.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_server_exec_returns_structured_error_on_conflict(): + from aiohttp.test_utils import TestClient, TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + test_client = TestClient(server) + await test_client.start_server() + try: + # Create session with network=off + resp = await test_client.post('/v1/sessions/conflict-1/exec', json={'cmd': 'echo hi', 'network': 'off'}) + assert resp.status == 200 + + # Try to use same session with network=on -> conflict + resp = await test_client.post('/v1/sessions/conflict-1/exec', json={'cmd': 'echo hi', 'network': 'on'}) + assert resp.status == 409 + data = await resp.json() + assert data['error']['code'] == 'session_conflict' + finally: + await test_client.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_server_create_session(): + from aiohttp.test_utils import TestClient, TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + test_client = TestClient(server) + await test_client.start_server() + try: + resp = await test_client.post('/v1/sessions/new-1', json={'image': 'python:3.11-slim'}) + assert resp.status == 201 + data = await resp.json() + assert data['session_id'] == 'new-1' + assert data['backend_name'] == 'fake' + assert 'created_at' in data + + # Session should appear in list + resp = await test_client.get('/v1/sessions') + sessions = await resp.json() + assert len(sessions) == 1 + assert sessions[0]['session_id'] == 'new-1' + finally: + await test_client.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_server_create_session_conflict(): + from aiohttp.test_utils import TestClient, TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + test_client = TestClient(server) + await test_client.start_server() + try: + resp = await test_client.post('/v1/sessions/dup-1', json={'network': 'off'}) + assert resp.status == 201 + + # Conflicting create with different network + resp = await test_client.post('/v1/sessions/dup-1', json={'network': 'on'}) + assert resp.status == 409 + data = await resp.json() + assert data['error']['code'] == 'session_conflict' + finally: + await test_client.close() + + +# ── Remote client error translation tests ───────────────────────────── + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_delete_session(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + + # Create session via exec + spec = BoxSpec.model_validate({'cmd': 'echo hi', 'session_id': 'r-del-1'}) + await client.execute(spec) + + # Delete it + await client.delete_session('r-del-1') + + # Verify empty + sessions = await client.get_sessions() + assert len(sessions) == 0 + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_delete_session_raises_not_found(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + + with pytest.raises(BoxSessionNotFoundError): + await client.delete_session('nonexistent') + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_create_session(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + + spec = BoxSpec.model_validate({'cmd': 'placeholder', 'session_id': 'r-create-1'}) + info = await client.create_session(spec) + assert info['session_id'] == 'r-create-1' + assert info['backend_name'] == 'fake' + + sessions = await client.get_sessions() + assert len(sessions) == 1 + await client.shutdown() + finally: + await server.close() + + +@requires_socket +@pytest.mark.asyncio +async def test_remote_client_exec_raises_conflict_error(): + from aiohttp.test_utils import TestServer + + from langbot.pkg.box.server import create_app as create_server_app + + logger = Mock() + backend = FakeBackend(logger) + runtime = BoxRuntime(logger=logger, backends=[backend], session_ttl_sec=300) + app = create_server_app(runtime) + server = TestServer(app) + await server.start_server() + try: + client = RemoteBoxRuntimeClient(base_url=str(server.make_url('')), logger=logger) + + # Create session with network=off + spec1 = BoxSpec.model_validate({'cmd': 'echo first', 'session_id': 'r-conflict-1', 'network': 'off'}) + await client.execute(spec1) + + # Conflicting exec with network=on + spec2 = BoxSpec.model_validate({'cmd': 'echo second', 'session_id': 'r-conflict-1', 'network': 'on'}) + with pytest.raises(BoxSessionConflictError): + await client.execute(spec2) + await client.shutdown() + finally: + await server.close() diff --git a/tests/unit_tests/pipeline/test_chat_handler_logging.py b/tests/unit_tests/pipeline/test_chat_handler_logging.py new file mode 100644 index 00000000..9886160e --- /dev/null +++ b/tests/unit_tests/pipeline/test_chat_handler_logging.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import langbot_plugin.api.entities.builtin.provider.message as provider_message + +from langbot.pkg.pipeline.process.logging_utils import format_result_log + + +def cut_str(s: str) -> str: + s0 = s.split('\n')[0] + if len(s0) > 20 or '\n' in s: + s0 = s0[:20] + '...' + return s0 + + +def test_chat_handler_formats_tool_call_request_log(): + result = provider_message.Message( + role='assistant', + content='', + tool_calls=[ + provider_message.ToolCall( + id='call-1', + type='function', + function=provider_message.FunctionCall(name='sandbox_exec', arguments='{}'), + ) + ], + ) + + summary = format_result_log(result, cut_str) + + assert summary == 'assistant: requested tools: sandbox_exec' + + +def test_chat_handler_formats_tool_result_log(): + result = provider_message.Message( + role='tool', + content='{"status":"completed","exit_code":0,"backend":"podman","stdout":"42\\n"}', + tool_call_id='call-1', + ) + + summary = format_result_log(result, cut_str) + + assert summary == 'tool result: status=completed exit_code=0 backend=podman stdout=42' + + +def test_chat_handler_formats_tool_error_log(): + result = provider_message.MessageChunk( + role='tool', + content='err: host_path must point to an existing directory on the host', + tool_call_id='call-1', + is_final=True, + ) + + summary = format_result_log(result, cut_str) + + assert summary is not None + assert summary.startswith('tool error: err: host_path must') + assert summary.endswith('...') + + +def test_chat_handler_skips_empty_assistant_log(): + result = provider_message.Message(role='assistant', content='') + + summary = format_result_log(result, cut_str) + + assert summary is None diff --git a/tests/unit_tests/provider/test_localagent_sandbox_exec.py b/tests/unit_tests/provider/test_localagent_sandbox_exec.py index eb013748..bd3ce358 100644 --- a/tests/unit_tests/provider/test_localagent_sandbox_exec.py +++ b/tests/unit_tests/provider/test_localagent_sandbox_exec.py @@ -58,6 +58,46 @@ class RecordingProvider: ) +class RecordingStreamProvider: + def __init__(self): + self.stream_requests: list[dict] = [] + + def invoke_llm_stream(self, query, model, messages, funcs, extra_args=None, remove_think=None): + self.stream_requests.append( + { + 'messages': list(messages), + 'funcs': list(funcs), + 'remove_think': remove_think, + } + ) + + async def _stream(): + if len(self.stream_requests) == 1: + yield provider_message.MessageChunk( + role='assistant', + tool_calls=[ + provider_message.ToolCall( + id='call-1', + type='function', + function=provider_message.FunctionCall( + name='sandbox_exec', + arguments=json.dumps({'cmd': "python -c 'print(1)'"}), + ), + ) + ], + is_final=True, + ) + return + + yield provider_message.MessageChunk( + role='assistant', + content='Tool execution failed.', + is_final=True, + ) + + return _stream() + + def make_query() -> pipeline_query.Query: adapter = AsyncMock() adapter.is_stream_output_supported = AsyncMock(return_value=False) @@ -156,3 +196,38 @@ async def test_localagent_uses_sandbox_exec_for_exact_calculation(): for message in first_request['messages'] ) assert [tool.name for tool in first_request['funcs']] == ['sandbox_exec'] + + +@pytest.mark.asyncio +async def test_localagent_streaming_tool_error_yields_message_chunks(): + provider = RecordingStreamProvider() + model = SimpleNamespace( + provider=provider, + model_entity=SimpleNamespace( + uuid='test-model-uuid', + name='test-model', + abilities=['func_call'], + extra_args={}, + ), + ) + + adapter = AsyncMock() + adapter.is_stream_output_supported = AsyncMock(return_value=True) + + query = make_query() + query.adapter = adapter + + app = SimpleNamespace( + logger=Mock(), + model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)), + tool_mgr=SimpleNamespace(execute_func_call=AsyncMock(side_effect=RuntimeError('boom'))), + rag_mgr=SimpleNamespace(), + instance_config=SimpleNamespace(data={'box': {'default_host_workspace': '/home/yhh/workspace/box-demo'}}), + ) + + runner = LocalAgentRunner(app, pipeline_config={}) + + results = [message async for message in runner.run(query)] + + assert all(isinstance(message, provider_message.MessageChunk) for message in results) + assert any(message.role == 'tool' and message.content == 'err: boom' for message in results)