refactor(box): unify box service lifecycle and local runtime

management
This commit is contained in:
youhuanghe
2026-03-20 11:15:18 +00:00
committed by WangCham
parent 15c03fe96b
commit eaae31edd0
19 changed files with 1506 additions and 61 deletions

View File

@@ -13,7 +13,7 @@ class BoxRouterGroup(group.RouterGroup):
@self.route('/sessions', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def _() -> str:
sessions = self.ap.box_service.runtime.get_sessions()
sessions = await self.ap.box_service.get_sessions()
return self.success(data=sessions)
@self.route('/errors', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)

View File

@@ -0,0 +1,232 @@
"""BoxRuntimeClient abstraction for local and remote Box Runtime access."""
from __future__ import annotations
import abc
import logging
from typing import TYPE_CHECKING
import aiohttp
from .errors import (
BoxBackendUnavailableError,
BoxError,
BoxRuntimeUnavailableError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
from .models import BoxExecutionResult, BoxExecutionStatus, BoxSpec
from .runtime import BoxRuntime
from ..utils import platform
if TYPE_CHECKING:
from ..core import app as core_app
_ERROR_CODE_MAP: dict[str, type[BoxError]] = {
'validation_error': BoxValidationError,
'session_not_found': BoxSessionNotFoundError,
'session_conflict': BoxSessionConflictError,
'backend_unavailable': BoxBackendUnavailableError,
'runtime_unavailable': BoxRuntimeUnavailableError,
'internal_error': BoxError,
}
def resolve_box_runtime_url(ap: 'core_app.Application') -> str:
box_config = getattr(ap, 'instance_config', None)
box_config_data = getattr(box_config, 'data', {}) if box_config is not None else {}
runtime_url = str(box_config_data.get('box', {}).get('runtime_url', '')).strip()
if runtime_url:
return runtime_url
if platform.get_platform() == 'docker':
return 'http://langbot_box_runtime:5410'
return 'http://127.0.0.1:5410'
def create_box_runtime_client(
ap: 'core_app.Application',
runtime_url: str | None = None,
) -> 'RemoteBoxRuntimeClient':
return RemoteBoxRuntimeClient(
base_url=runtime_url or resolve_box_runtime_url(ap),
logger=ap.logger,
)
class BoxRuntimeClient(abc.ABC):
"""Abstract interface that BoxService uses to talk to a Box Runtime."""
@abc.abstractmethod
async def initialize(self) -> None: ...
@abc.abstractmethod
async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ...
@abc.abstractmethod
async def shutdown(self) -> None: ...
@abc.abstractmethod
async def get_status(self) -> dict: ...
@abc.abstractmethod
async def get_sessions(self) -> list[dict]: ...
@abc.abstractmethod
async def get_backend_info(self) -> dict: ...
@abc.abstractmethod
async def delete_session(self, session_id: str) -> None: ...
@abc.abstractmethod
async def create_session(self, spec: BoxSpec) -> dict: ...
class LocalBoxRuntimeClient(BoxRuntimeClient):
"""In-process client that wraps a real BoxRuntime directly."""
def __init__(self, logger: logging.Logger, runtime: BoxRuntime | None = None):
self._runtime = runtime or BoxRuntime(logger=logger)
@property
def runtime(self) -> BoxRuntime:
return self._runtime
async def initialize(self) -> None:
await self._runtime.initialize()
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
return await self._runtime.execute(spec)
async def shutdown(self) -> None:
await self._runtime.shutdown()
async def get_status(self) -> dict:
return await self._runtime.get_status()
async def get_sessions(self) -> list[dict]:
return self._runtime.get_sessions()
async def get_backend_info(self) -> dict:
return await self._runtime.get_backend_info()
async def delete_session(self, session_id: str) -> None:
await self._runtime.delete_session(session_id)
async def create_session(self, spec: BoxSpec) -> dict:
return await self._runtime.create_session(spec)
class RemoteBoxRuntimeClient(BoxRuntimeClient):
"""HTTP client that talks to a standalone Box Runtime service."""
def __init__(self, base_url: str, logger: logging.Logger):
self._base_url = base_url.rstrip('/')
self._logger = logger
self._session: aiohttp.ClientSession | None = None
def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def _check_response(self, resp: aiohttp.ClientResponse) -> None:
if resp.status < 400:
return
try:
body = await resp.json()
error_info = body.get('error', {})
code = error_info.get('code', '')
message = error_info.get('message', '')
except Exception:
resp.raise_for_status()
return
exc_class = _ERROR_CODE_MAP.get(code, BoxError)
raise exc_class(message)
async def initialize(self) -> None:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/health') as resp:
await self._check_response(resp)
self._logger.info(f'LangBot Box runtime connected: {self._base_url}')
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def execute(self, spec: BoxSpec) -> BoxExecutionResult:
session = self._get_session()
payload = spec.model_dump(mode='json')
try:
async with session.post(
f'{self._base_url}/v1/sessions/{spec.session_id}/exec',
json=payload,
) as resp:
await self._check_response(resp)
data = await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
return BoxExecutionResult(
session_id=data['session_id'],
backend_name=data['backend_name'],
status=BoxExecutionStatus(data['status']),
exit_code=data.get('exit_code'),
stdout=data.get('stdout', ''),
stderr=data.get('stderr', ''),
duration_ms=data['duration_ms'],
)
async def shutdown(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
self._session = None
async def get_status(self) -> dict:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/status') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def get_sessions(self) -> list[dict]:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/sessions') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def get_backend_info(self) -> dict:
session = self._get_session()
try:
async with session.get(f'{self._base_url}/v1/health') as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def delete_session(self, session_id: str) -> None:
session = self._get_session()
try:
async with session.delete(
f'{self._base_url}/v1/sessions/{session_id}',
) as resp:
await self._check_response(resp)
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc
async def create_session(self, spec: BoxSpec) -> dict:
session = self._get_session()
payload = spec.model_dump(mode='json')
try:
async with session.post(
f'{self._base_url}/v1/sessions/{spec.session_id}',
json=payload,
) as resp:
await self._check_response(resp)
return await resp.json()
except aiohttp.ClientError as exc:
raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import asyncio
import os
import sys
from typing import TYPE_CHECKING
from .errors import BoxRuntimeUnavailableError
from .client import create_box_runtime_client, resolve_box_runtime_url
from ..utils import platform
if TYPE_CHECKING:
from ..core import app as core_app
class BoxRuntimeConnector:
"""Build and initialize the Box runtime-facing service for the app."""
_HEALTH_CHECK_RETRY_COUNT = 40
_HEALTH_CHECK_RETRY_INTERVAL_SEC = 0.25
def __init__(self, ap: 'core_app.Application'):
self.ap = ap
self.configured_runtime_url = self._load_configured_runtime_url()
self.runtime_url = self.configured_runtime_url or resolve_box_runtime_url(ap)
self.manages_local_runtime = self._should_manage_local_runtime()
self.client = create_box_runtime_client(ap, runtime_url=self.runtime_url)
self.runtime_subprocess: asyncio.subprocess.Process | None = None
self.runtime_subprocess_task: asyncio.Task | None = None
async def initialize(self) -> None:
if not self.manages_local_runtime:
await self.client.initialize()
return
try:
await self.client.initialize()
return
except BoxRuntimeUnavailableError:
self.ap.logger.info(
'Local Box runtime is not running, starting an embedded Box runtime server...'
)
await self._start_local_runtime_process()
await self._wait_until_runtime_ready()
def dispose(self) -> None:
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
self.ap.logger.info('Terminating local Box runtime process...')
self.runtime_subprocess.terminate()
if self.runtime_subprocess_task is not None:
self.runtime_subprocess_task.cancel()
self.runtime_subprocess_task = None
def _load_configured_runtime_url(self) -> str:
box_config = getattr(self.ap, 'instance_config', None)
box_config_data = getattr(box_config, 'data', {}) if box_config is not None else {}
return str(box_config_data.get('box', {}).get('runtime_url', '')).strip()
def _should_manage_local_runtime(self) -> bool:
return not self.configured_runtime_url and platform.get_platform() != 'docker'
async def _start_local_runtime_process(self) -> None:
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is None:
return
python_path = sys.executable
env = os.environ.copy()
self.runtime_subprocess = await asyncio.create_subprocess_exec(
python_path,
'-m',
'langbot.pkg.box.server',
env=env,
)
self.runtime_subprocess_task = asyncio.create_task(self.runtime_subprocess.wait())
async def _wait_until_runtime_ready(self) -> None:
last_exc: BoxRuntimeUnavailableError | None = None
for _ in range(self._HEALTH_CHECK_RETRY_COUNT):
if self.runtime_subprocess is not None and self.runtime_subprocess.returncode is not None:
raise BoxRuntimeUnavailableError(
f'local box runtime exited before becoming ready (code {self.runtime_subprocess.returncode})'
)
try:
await self.client.initialize()
self.ap.logger.info(f'Local Box runtime is ready at {self.runtime_url}.')
return
except BoxRuntimeUnavailableError as exc:
last_exc = exc
await asyncio.sleep(self._HEALTH_CHECK_RETRY_INTERVAL_SEC)
if last_exc is not None:
raise last_exc
raise BoxRuntimeUnavailableError('local box runtime did not become ready')

View File

@@ -13,5 +13,13 @@ class BoxBackendUnavailableError(BoxError):
"""Raised when no supported container backend is available."""
class BoxRuntimeUnavailableError(BoxError):
"""Raised when the standalone Box Runtime service is unavailable."""
class BoxSessionConflictError(BoxError):
"""Raised when an existing session cannot satisfy a new request."""
class BoxSessionNotFoundError(BoxError):
"""Raised when a referenced session does not exist."""

View File

@@ -6,7 +6,7 @@ import datetime as dt
import logging
from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend
from .errors import BoxBackendUnavailableError, BoxSessionConflictError
from .errors import BoxBackendUnavailableError, BoxSessionConflictError, BoxSessionNotFoundError
from .models import BoxExecutionResult, BoxExecutionStatus, BoxSessionInfo, BoxSpec
_UTC = dt.timezone.utc
@@ -65,6 +65,16 @@ class BoxRuntime:
for session_id in session_ids:
await self._drop_session_locked(session_id)
async def create_session(self, spec: BoxSpec) -> dict:
session = await self._get_or_create_session(spec)
return self._session_to_dict(session.info)
async def delete_session(self, session_id: str) -> None:
async with self._lock:
if session_id not in self._sessions:
raise BoxSessionNotFoundError(f'session {session_id} not found')
await self._drop_session_locked(session_id)
# ── Observability ─────────────────────────────────────────────────
async def get_backend_info(self) -> dict:
@@ -78,24 +88,7 @@ class BoxRuntime:
return {'name': backend.name, 'available': available}
def get_sessions(self) -> list[dict]:
return [
{
'session_id': s.info.session_id,
'backend_name': s.info.backend_name,
'backend_session_id': s.info.backend_session_id,
'image': s.info.image,
'network': s.info.network.value,
'host_path': s.info.host_path,
'host_path_mode': s.info.host_path_mode.value,
'cpus': s.info.cpus,
'memory_mb': s.info.memory_mb,
'pids_limit': s.info.pids_limit,
'read_only_rootfs': s.info.read_only_rootfs,
'created_at': s.info.created_at.isoformat(),
'last_used_at': s.info.last_used_at.isoformat(),
}
for s in self._sessions.values()
]
return [self._session_to_dict(s.info) for s in self._sessions.values()]
async def get_status(self) -> dict:
backend_info = await self.get_backend_info()
@@ -222,3 +215,21 @@ class BoxRuntime:
raise BoxSessionConflictError(
f'sandbox_exec session {spec.session_id} already exists with read_only_rootfs={session.read_only_rootfs}'
)
@staticmethod
def _session_to_dict(info: BoxSessionInfo) -> dict:
return {
'session_id': info.session_id,
'backend_name': info.backend_name,
'backend_session_id': info.backend_session_id,
'image': info.image,
'network': info.network.value,
'host_path': info.host_path,
'host_path_mode': info.host_path_mode.value,
'cpus': info.cpus,
'memory_mb': info.memory_mb,
'pids_limit': info.pids_limit,
'read_only_rootfs': info.read_only_rootfs,
'created_at': info.created_at.isoformat(),
'last_used_at': info.last_used_at.isoformat(),
}

View File

@@ -0,0 +1,176 @@
"""Standalone HTTP service exposing BoxRuntime as a REST API.
Usage:
python -m langbot.pkg.box.server [--host 0.0.0.0] [--port 5410]
"""
from __future__ import annotations
import argparse
import logging
import pydantic
from aiohttp import web
from .errors import (
BoxBackendUnavailableError,
BoxError,
BoxSessionConflictError,
BoxSessionNotFoundError,
BoxValidationError,
)
from .models import BoxExecutionResult, BoxSpec
from .runtime import BoxRuntime
logger = logging.getLogger('langbot.box.server')
_ERROR_MAP: dict[type, tuple[int, str]] = {
BoxValidationError: (400, 'validation_error'),
BoxSessionNotFoundError: (404, 'session_not_found'),
BoxSessionConflictError: (409, 'session_conflict'),
BoxBackendUnavailableError: (503, 'backend_unavailable'),
}
def _error_response(exc: Exception) -> web.Response:
for exc_type, (status, code) in _ERROR_MAP.items():
if isinstance(exc, exc_type):
return web.json_response(
{'error': {'code': code, 'message': str(exc)}},
status=status,
)
return web.json_response(
{'error': {'code': 'internal_error', 'message': str(exc)}},
status=500,
)
def _result_to_dict(result: BoxExecutionResult) -> dict:
return {
'session_id': result.session_id,
'backend_name': result.backend_name,
'status': result.status.value,
'exit_code': result.exit_code,
'stdout': result.stdout,
'stderr': result.stderr,
'duration_ms': result.duration_ms,
}
async def handle_exec(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
body = await request.json()
session_id = request.match_info['session_id']
body['session_id'] = session_id
spec = BoxSpec.model_validate(body)
result = await runtime.execute(spec)
return web.json_response(_result_to_dict(result))
except pydantic.ValidationError as exc:
return web.json_response(
{'error': {'code': 'validation_error', 'message': str(exc)}},
status=400,
)
except BoxError as exc:
return _error_response(exc)
async def handle_create_session(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
body = await request.json()
session_id = request.match_info['session_id']
body['session_id'] = session_id
body.setdefault('cmd', '__langbot_session_placeholder__')
spec = BoxSpec.model_validate(body)
session_info = await runtime.create_session(spec)
return web.json_response(session_info, status=201)
except pydantic.ValidationError as exc:
return web.json_response(
{'error': {'code': 'validation_error', 'message': str(exc)}},
status=400,
)
except BoxError as exc:
return _error_response(exc)
async def handle_get_sessions(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
return web.json_response(runtime.get_sessions())
except BoxError as exc:
return _error_response(exc)
async def handle_delete_session(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
session_id = request.match_info['session_id']
try:
await runtime.delete_session(session_id)
return web.json_response({'deleted': session_id})
except BoxError as exc:
return _error_response(exc)
async def handle_status(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
status = await runtime.get_status()
return web.json_response(status)
except BoxError as exc:
return _error_response(exc)
async def handle_health(request: web.Request) -> web.Response:
runtime: BoxRuntime = request.app['runtime']
try:
info = await runtime.get_backend_info()
return web.json_response(info)
except BoxError as exc:
return _error_response(exc)
def create_app(runtime: BoxRuntime | None = None) -> web.Application:
"""Create the aiohttp Application with all routes.
If *runtime* is ``None`` a new ``BoxRuntime`` is created using the module
logger.
"""
if runtime is None:
runtime = BoxRuntime(logger=logger)
app = web.Application()
app['runtime'] = runtime
app.router.add_post('/v1/sessions/{session_id}/exec', handle_exec)
app.router.add_post('/v1/sessions/{session_id}', handle_create_session)
app.router.add_get('/v1/sessions', handle_get_sessions)
app.router.add_delete('/v1/sessions/{session_id}', handle_delete_session)
app.router.add_get('/v1/status', handle_status)
app.router.add_get('/v1/health', handle_health)
async def on_startup(_app: web.Application) -> None:
await _app['runtime'].initialize()
async def on_shutdown(_app: web.Application) -> None:
await _app['runtime'].shutdown()
app.on_startup.append(on_startup)
app.on_shutdown.append(on_shutdown)
return app
def main() -> None:
parser = argparse.ArgumentParser(description='LangBot Box Runtime HTTP Service')
parser.add_argument('--host', default='0.0.0.0', help='Bind address')
parser.add_argument('--port', type=int, default=5410, help='Bind port')
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
app = create_app()
web.run_app(app, host=args.host, port=args.port)
if __name__ == '__main__':
main()

View File

@@ -9,9 +9,10 @@ from typing import TYPE_CHECKING
import pydantic
from .client import BoxRuntimeClient
from .connector import BoxRuntimeConnector
from .errors import BoxError, BoxValidationError
from .models import BUILTIN_PROFILES, BoxExecutionResult, BoxProfile, BoxSpec
from .runtime import BoxRuntime
_INT_ADAPTER = pydantic.TypeAdapter(int)
_UTC = _dt.timezone.utc
@@ -26,19 +27,28 @@ class BoxService:
def __init__(
self,
ap: 'core_app.Application',
runtime: BoxRuntime | None = None,
client: BoxRuntimeClient | None = None,
output_limit_chars: int = 4000,
):
self.ap = ap
self.runtime = runtime or BoxRuntime(logger=ap.logger)
self._runtime_connector: BoxRuntimeConnector | None = None
if client is None:
self._runtime_connector = BoxRuntimeConnector(ap)
client = self._runtime_connector.client
self.client = client
self.output_limit_chars = output_limit_chars
self.allowed_host_mount_roots = self._load_allowed_host_mount_roots()
self.default_host_workspace = self._load_default_host_workspace()
self.profile = self._load_profile()
self._recent_errors: collections.deque[dict] = collections.deque(maxlen=_MAX_RECENT_ERRORS)
self._shutdown_task = None
async def initialize(self):
await self.runtime.initialize()
self._ensure_default_host_workspace()
if self._runtime_connector is not None:
await self._runtime_connector.initialize()
return
await self.client.initialize()
async def execute_sandbox_tool(self, parameters: dict, query: 'pipeline_query.Query') -> dict:
spec_payload = dict(parameters)
@@ -64,7 +74,7 @@ class BoxService:
f'spec={json.dumps(self._summarize_spec(spec), ensure_ascii=False)}'
)
try:
result = await self.runtime.execute(spec)
result = await self.client.execute(spec)
except BoxError as exc:
self._record_error(exc, query)
raise
@@ -76,7 +86,21 @@ class BoxService:
return self._serialize_result(result)
async def shutdown(self):
await self.runtime.shutdown()
await self.client.shutdown()
def dispose(self):
if self._runtime_connector is not None:
self._runtime_connector.dispose()
loop = getattr(self.ap, 'event_loop', None)
if (
loop is not None
and not loop.is_closed()
and (self._shutdown_task is None or self._shutdown_task.done())
):
self._shutdown_task = loop.create_task(self.shutdown())
async def get_sessions(self) -> list[dict]:
return await self.client.get_sessions()
def _serialize_result(self, result: BoxExecutionResult) -> dict:
stdout, stdout_truncated = self._truncate(result.stdout)
@@ -186,6 +210,29 @@ class BoxService:
return None
return os.path.realpath(os.path.abspath(default_host_workspace))
def _ensure_default_host_workspace(self):
if self.default_host_workspace is None:
return
if os.path.isdir(self.default_host_workspace):
return
if os.path.exists(self.default_host_workspace):
raise BoxValidationError('default_host_workspace must point to a directory on the host')
if not self.allowed_host_mount_roots:
raise BoxValidationError(
'default_host_workspace cannot be created because no allowed_host_mount_roots are configured'
)
for allowed_root in self.allowed_host_mount_roots:
if self.default_host_workspace == allowed_root or self.default_host_workspace.startswith(f'{allowed_root}{os.sep}'):
os.makedirs(self.default_host_workspace, exist_ok=True)
return
allowed_roots = ', '.join(self.allowed_host_mount_roots)
raise BoxValidationError(f'default_host_workspace is outside allowed_host_mount_roots: {allowed_roots}')
def _validate_host_mount(self, spec: BoxSpec):
if spec.host_path is None:
return
@@ -255,7 +302,7 @@ class BoxService:
return list(self._recent_errors)
async def get_status(self) -> dict:
runtime_status = await self.runtime.get_status()
runtime_status = await self.client.get_status()
return {
**runtime_status,
'profile': self.profile.name,

View File

@@ -235,7 +235,10 @@ class Application:
self.logger.debug(f'Traceback: {traceback.format_exc()}')
def dispose(self):
self.plugin_connector.dispose()
if self.plugin_connector is not None:
self.plugin_connector.dispose()
if self.box_service is not None:
self.box_service.dispose()
async def print_web_access_info(self):
"""Print access webui tips"""

View File

@@ -46,6 +46,7 @@ async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application:
async def main(loop: asyncio.AbstractEventLoop):
app_inst: app.Application | None = None
try:
# Hang system signal processing
import signal
@@ -60,4 +61,6 @@ async def main(loop: asyncio.AbstractEventLoop):
app_inst = await make_app(loop)
await app_inst.run()
except Exception:
if app_inst is not None:
app_inst.dispose()
traceback.print_exc()

View File

@@ -6,9 +6,9 @@ from .. import stage, app
from ...utils import version, proxy
from ...pipeline import pool, controller, pipelinemgr
from ...pipeline import aggregator as message_aggregator
from ...box import service as box_service
from ...plugin import connector as plugin_connector
from ...command import cmdmgr
from ...box import service as box_service
from ...provider.session import sessionmgr as llm_session_mgr
from ...provider.modelmgr import modelmgr as llm_model_mgr
from ...provider.tools import toolmgr as llm_tool_mgr

View File

@@ -17,12 +17,19 @@ from ....provider import runners
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from .. import logging_utils
importutil.import_modules_in_pkg(runners)
class ChatMessageHandler(handler.MessageHandler):
def _format_result_log(
self,
result: provider_message.Message | provider_message.MessageChunk,
) -> str | None:
return logging_utils.format_result_log(result, self.cut_str)
async def handle(
self,
query: pipeline_query.Query,
@@ -113,9 +120,11 @@ class ChatMessageHandler(handler.MessageHandler):
# This prevents memory overflow from thousands of log entries per conversation
# First chunk uses INFO level to confirm connection establishment
if chunk_count == 1:
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}'
)
summary = self._format_result_log(result)
if summary is not None:
self.ap.logger.info(f'Conversation({query.query_id}) Streaming started: {summary}')
else:
self.ap.logger.info(f'Conversation({query.query_id}) Streaming started')
elif chunk_count % 10 == 0:
self.ap.logger.debug(
f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}'
@@ -135,9 +144,9 @@ class ChatMessageHandler(handler.MessageHandler):
async for result in runner.run(query):
query.resp_messages.append(result)
self.ap.logger.info(
f'Conversation({query.query_id}) Response: {self.cut_str(result.readable_str())}'
)
summary = self._format_result_log(result)
if summary is not None:
self.ap.logger.info(f'Conversation({query.query_id}) Response: {summary}')
if result.content is not None:
text_length += len(result.content)

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import json
import typing
import langbot_plugin.api.entities.builtin.provider.message as provider_message
def format_result_log(
result: provider_message.Message | provider_message.MessageChunk,
cut_str: typing.Callable[[str], str],
) -> str | None:
if result.tool_calls:
tool_names = [tc.function.name for tc in result.tool_calls if tc.function and tc.function.name]
if tool_names:
return f'{result.role}: requested tools: {", ".join(tool_names)}'
return f'{result.role}: requested tool calls'
content = result.content
if isinstance(content, str):
if not content.strip():
return None
if result.role == 'tool':
if content.startswith('err:'):
return f'tool error: {cut_str(content)}'
try:
payload = json.loads(content)
except json.JSONDecodeError:
return cut_str(result.readable_str())
if isinstance(payload, dict):
status = payload.get('status', 'unknown')
exit_code = payload.get('exit_code')
backend = payload.get('backend', '')
stdout = str(payload.get('stdout', '')).strip()
summary = f'tool result: status={status}'
if exit_code is not None:
summary += f' exit_code={exit_code}'
if backend:
summary += f' backend={backend}'
if stdout:
summary += f' stdout={cut_str(stdout)}'
return summary
return cut_str(result.readable_str())
if isinstance(content, list) and len(content) == 0:
return None
return cut_str(result.readable_str())

View File

@@ -410,7 +410,15 @@ class LocalAgentRunner(runner.RequestRunner):
req_messages.append(msg)
except Exception as e:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
if is_stream:
err_msg = provider_message.MessageChunk(
role='tool',
content=f'err: {e}',
tool_call_id=tool_call.id,
is_final=True,
)
else:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
yield err_msg

View File

@@ -89,8 +89,9 @@ monitoring:
check_interval_hours: 1
box:
profile: 'default'
default_host_workspace: './data/box-workspaces/default'
allowed_host_mount_roots:
runtime_url: '' # Leave empty to use defaults: http://127.0.0.1:5410 locally, http://langbot_box_runtime:5410 in Docker
default_host_workspace: './data/box-workspaces/default' # For Docker deployment, use '/workspaces/default'
allowed_host_mount_roots: # For Docker deployment, use '/workspaces' instead
- './data/box-workspaces'
- '/tmp'
space: