From 29eadcb5ab72c20bc45aaf5fcb2ce32bf189e08e Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Sat, 18 Apr 2026 23:27:41 +0800 Subject: [PATCH] feat(box): add heartbeat and reconnection for Box runtime connector Add 20-second heartbeat ping loop to detect silent Box runtime disconnections. On disconnect, set available=false and attempt reconnection after 3 seconds via the disconnect callback chain. - BoxRuntimeConnector: heartbeat loop, disconnect callback parameter, disconnect detection in connection callback and WS failure handler - BoxService: wire disconnect callback to toggle available state and re-initialize the connector on reconnection --- src/langbot/pkg/box/connector.py | 70 +++++++++++++++++++++++++------- src/langbot/pkg/box/service.py | 15 ++++++- 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/src/langbot/pkg/box/connector.py b/src/langbot/pkg/box/connector.py index 6784309b..cec1762a 100644 --- a/src/langbot/pkg/box/connector.py +++ b/src/langbot/pkg/box/connector.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import os import sys +import typing from typing import TYPE_CHECKING from urllib.parse import urlparse @@ -24,6 +25,8 @@ if TYPE_CHECKING: _DOCKER_BOX_HOST = 'langbot_box' _DEFAULT_PORT = 5410 +_HEARTBEAT_INTERVAL_SEC = 20 + def _get_box_config(ap) -> dict: """Return the 'box' section from instance config, with safe fallbacks.""" @@ -61,8 +64,16 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): 3. Unix / macOS -> subprocess + stdio pipe """ - def __init__(self, ap: core_app.Application): + def __init__( + self, + ap: core_app.Application, + runtime_disconnect_callback: typing.Callable[ + ['BoxRuntimeConnector'], typing.Coroutine[typing.Any, typing.Any, None] + ] + | None = None, + ): super().__init__(ap) + self.runtime_disconnect_callback = runtime_disconnect_callback self.configured_runtime_url = self._load_configured_runtime_url() self.ws_relay_base_url = resolve_box_ws_relay_url(ap) self.client = ActionRPCBoxClient(logger=ap.logger) @@ -70,6 +81,7 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): self._handler: Handler | None = None self._handler_task: asyncio.Task | None = None self._ctrl_task: asyncio.Task | None = None + self._heartbeat_task: asyncio.Task | None = None # Parse the relay URL once for reuse. parsed = urlparse(self.ws_relay_base_url) @@ -93,16 +105,33 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): async def initialize(self) -> None: if self._uses_websocket(): if platform.get_platform() == 'win32' and not self.configured_runtime_url: - # Windows without an explicit URL: launch the box server as a - # detached subprocess (no stdio pipe) then talk to it via WS, - # because Windows ProactorEventLoop does not support async - # subprocess stdin/stdout pipes. await self._start_subprocess_then_ws() else: await self._connect_remote_ws() else: await self._start_local_stdio() + # Start heartbeat after successful connection + if self._heartbeat_task is None: + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + + # -- heartbeat ----------------------------------------------------------- + + async def _heartbeat_loop(self) -> None: + """Periodically ping the Box runtime to detect silent disconnections.""" + while True: + await asyncio.sleep(_HEARTBEAT_INTERVAL_SEC) + try: + await self.ping() + self.ap.logger.debug('Heartbeat to Box runtime success.') + except Exception as e: + self.ap.logger.debug(f'Failed to heartbeat to Box runtime: {e}') + + async def ping(self) -> None: + if self._handler is None: + raise BoxRuntimeUnavailableError('Box runtime is not connected') + await self._handler.call_action(CommonAction.PING, {}) + # -- transport paths ----------------------------------------------------- async def _start_local_stdio(self) -> None: @@ -125,7 +154,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): ctrl.run(self._make_connection_callback('stdio', connected, connect_error)) ) - # Wait for connection or failure try: await asyncio.wait_for(connected.wait(), timeout=30.0) except asyncio.TimeoutError: @@ -134,14 +162,12 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): if connect_error: raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}') - # Store subprocess reference for dispose (StdioClientController manages it) self._subprocess = ctrl.process async def _start_subprocess_then_ws(self) -> None: """Launch box server as detached subprocess, then connect via WS (Windows).""" self.ap.logger.info('(windows) Use cmd to launch box runtime and communicate via ws') - # Launch the box server subprocess in ws mode (no stdio pipe). await self._start_runtime_subprocess( '-m', 'langbot_plugin.box.server', @@ -151,7 +177,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): str(self._relay_port), ) - # Wait for the WS endpoint to become reachable, then connect. ws_url = f'ws://localhost:{self._relay_port}/rpc/ws' await self._connect_ws(ws_url, '(windows) WebSocket') @@ -167,11 +192,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): """Determine the action-RPC WebSocket URL. All endpoints share a single port; action RPC is at ``/rpc/ws``. - - Priority: - 1. Explicit ``box.runtime_url`` from config (user-supplied, used as-is) - 2. Docker environment -> ``ws://langbot_box:5410/rpc/ws`` - 3. --standalone-box / Windows fallback -> ``ws://localhost:5410/rpc/ws`` """ if self.configured_runtime_url: return self.configured_runtime_url @@ -189,8 +209,14 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): connect_error: list[Exception] = [] async def on_connect_failed(ctrl, exc): + if exc is not None: + self.ap.logger.error(f'Failed to connect to Box runtime ({ws_url}): {exc}') + else: + self.ap.logger.error(f'Failed to connect to Box runtime ({ws_url}), trying to reconnect...') connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed')) connected.set() + if self.runtime_disconnect_callback is not None: + await self.runtime_disconnect_callback(self) ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed) self._ctrl_task = asyncio.create_task( @@ -225,12 +251,28 @@ class BoxRuntimeConnector(ManagedRuntimeConnector): if not connected.is_set(): connect_error.append(exc) connected.set() + else: + # Connection was established but then dropped — this is a + # disconnect after successful handshake. + if self._uses_websocket(): + self.ap.logger.error('Disconnected from Box runtime, trying to reconnect...') + if self.runtime_disconnect_callback is not None: + await self.runtime_disconnect_callback(self) + else: + self.ap.logger.error( + 'Disconnected from Box runtime via stdio. ' + 'Cannot automatically reconnect — please restart LangBot.' + ) return new_connection_callback # -- lifecycle ----------------------------------------------------------- def dispose(self) -> None: + if self._heartbeat_task is not None: + self._heartbeat_task.cancel() + self._heartbeat_task = None + if self._handler_task is not None: self._handler_task.cancel() self._handler_task = None diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index cf41d789..6c3bb74e 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import collections import datetime as _dt import enum @@ -52,7 +53,7 @@ class BoxService: self.ap = ap self._runtime_connector: BoxRuntimeConnector | None = None if client is None: - self._runtime_connector = BoxRuntimeConnector(ap) + self._runtime_connector = BoxRuntimeConnector(ap, runtime_disconnect_callback=self._on_runtime_disconnect) client = self._runtime_connector.client self.client = client self.output_limit_chars = output_limit_chars @@ -82,6 +83,18 @@ class BoxService: self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}') self._available = False + async def _on_runtime_disconnect(self, connector: BoxRuntimeConnector) -> None: + """Called by the connector when the Box runtime connection drops.""" + self._available = False + self.ap.logger.warning('Box runtime disconnected, sandbox features temporarily disabled. Reconnecting in 3s...') + await asyncio.sleep(3) + try: + await connector.initialize() + self._available = True + self.ap.logger.info('Box runtime reconnected, sandbox features restored.') + except Exception as exc: + self.ap.logger.warning(f'Box runtime reconnection failed: {exc}. Will retry on next heartbeat disconnect.') + @property def available(self) -> bool: return self._available