feat(box): add heartbeat and reconnection for Box runtime connector

Add 20-second heartbeat ping loop to detect silent Box runtime
disconnections. On disconnect, set available=false and attempt
reconnection after 3 seconds via the disconnect callback chain.

- BoxRuntimeConnector: heartbeat loop, disconnect callback parameter,
  disconnect detection in connection callback and WS failure handler
- BoxService: wire disconnect callback to toggle available state and
  re-initialize the connector on reconnection
This commit is contained in:
Junyan Qin
2026-04-18 23:27:41 +08:00
committed by WangCham
parent 5a4ec62b14
commit 29eadcb5ab
2 changed files with 70 additions and 15 deletions
+56 -14
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio import asyncio
import os import os
import sys import sys
import typing
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -24,6 +25,8 @@ if TYPE_CHECKING:
_DOCKER_BOX_HOST = 'langbot_box' _DOCKER_BOX_HOST = 'langbot_box'
_DEFAULT_PORT = 5410 _DEFAULT_PORT = 5410
_HEARTBEAT_INTERVAL_SEC = 20
def _get_box_config(ap) -> dict: def _get_box_config(ap) -> dict:
"""Return the 'box' section from instance config, with safe fallbacks.""" """Return the 'box' section from instance config, with safe fallbacks."""
@@ -61,8 +64,16 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
3. Unix / macOS -> subprocess + stdio pipe 3. Unix / macOS -> subprocess + stdio pipe
""" """
def __init__(self, ap: core_app.Application): def __init__(
self,
ap: core_app.Application,
runtime_disconnect_callback: typing.Callable[
['BoxRuntimeConnector'], typing.Coroutine[typing.Any, typing.Any, None]
]
| None = None,
):
super().__init__(ap) super().__init__(ap)
self.runtime_disconnect_callback = runtime_disconnect_callback
self.configured_runtime_url = self._load_configured_runtime_url() self.configured_runtime_url = self._load_configured_runtime_url()
self.ws_relay_base_url = resolve_box_ws_relay_url(ap) self.ws_relay_base_url = resolve_box_ws_relay_url(ap)
self.client = ActionRPCBoxClient(logger=ap.logger) self.client = ActionRPCBoxClient(logger=ap.logger)
@@ -70,6 +81,7 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
self._handler: Handler | None = None self._handler: Handler | None = None
self._handler_task: asyncio.Task | None = None self._handler_task: asyncio.Task | None = None
self._ctrl_task: asyncio.Task | None = None self._ctrl_task: asyncio.Task | None = None
self._heartbeat_task: asyncio.Task | None = None
# Parse the relay URL once for reuse. # Parse the relay URL once for reuse.
parsed = urlparse(self.ws_relay_base_url) parsed = urlparse(self.ws_relay_base_url)
@@ -93,16 +105,33 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
async def initialize(self) -> None: async def initialize(self) -> None:
if self._uses_websocket(): if self._uses_websocket():
if platform.get_platform() == 'win32' and not self.configured_runtime_url: if platform.get_platform() == 'win32' and not self.configured_runtime_url:
# Windows without an explicit URL: launch the box server as a
# detached subprocess (no stdio pipe) then talk to it via WS,
# because Windows ProactorEventLoop does not support async
# subprocess stdin/stdout pipes.
await self._start_subprocess_then_ws() await self._start_subprocess_then_ws()
else: else:
await self._connect_remote_ws() await self._connect_remote_ws()
else: else:
await self._start_local_stdio() await self._start_local_stdio()
# Start heartbeat after successful connection
if self._heartbeat_task is None:
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
# -- heartbeat -----------------------------------------------------------
async def _heartbeat_loop(self) -> None:
"""Periodically ping the Box runtime to detect silent disconnections."""
while True:
await asyncio.sleep(_HEARTBEAT_INTERVAL_SEC)
try:
await self.ping()
self.ap.logger.debug('Heartbeat to Box runtime success.')
except Exception as e:
self.ap.logger.debug(f'Failed to heartbeat to Box runtime: {e}')
async def ping(self) -> None:
if self._handler is None:
raise BoxRuntimeUnavailableError('Box runtime is not connected')
await self._handler.call_action(CommonAction.PING, {})
# -- transport paths ----------------------------------------------------- # -- transport paths -----------------------------------------------------
async def _start_local_stdio(self) -> None: async def _start_local_stdio(self) -> None:
@@ -125,7 +154,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
ctrl.run(self._make_connection_callback('stdio', connected, connect_error)) ctrl.run(self._make_connection_callback('stdio', connected, connect_error))
) )
# Wait for connection or failure
try: try:
await asyncio.wait_for(connected.wait(), timeout=30.0) await asyncio.wait_for(connected.wait(), timeout=30.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
@@ -134,14 +162,12 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
if connect_error: if connect_error:
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}') raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
# Store subprocess reference for dispose (StdioClientController manages it)
self._subprocess = ctrl.process self._subprocess = ctrl.process
async def _start_subprocess_then_ws(self) -> None: async def _start_subprocess_then_ws(self) -> None:
"""Launch box server as detached subprocess, then connect via WS (Windows).""" """Launch box server as detached subprocess, then connect via WS (Windows)."""
self.ap.logger.info('(windows) Use cmd to launch box runtime and communicate via ws') self.ap.logger.info('(windows) Use cmd to launch box runtime and communicate via ws')
# Launch the box server subprocess in ws mode (no stdio pipe).
await self._start_runtime_subprocess( await self._start_runtime_subprocess(
'-m', '-m',
'langbot_plugin.box.server', 'langbot_plugin.box.server',
@@ -151,7 +177,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
str(self._relay_port), str(self._relay_port),
) )
# Wait for the WS endpoint to become reachable, then connect.
ws_url = f'ws://localhost:{self._relay_port}/rpc/ws' ws_url = f'ws://localhost:{self._relay_port}/rpc/ws'
await self._connect_ws(ws_url, '(windows) WebSocket') await self._connect_ws(ws_url, '(windows) WebSocket')
@@ -167,11 +192,6 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
"""Determine the action-RPC WebSocket URL. """Determine the action-RPC WebSocket URL.
All endpoints share a single port; action RPC is at ``/rpc/ws``. All endpoints share a single port; action RPC is at ``/rpc/ws``.
Priority:
1. Explicit ``box.runtime_url`` from config (user-supplied, used as-is)
2. Docker environment -> ``ws://langbot_box:5410/rpc/ws``
3. --standalone-box / Windows fallback -> ``ws://localhost:5410/rpc/ws``
""" """
if self.configured_runtime_url: if self.configured_runtime_url:
return self.configured_runtime_url return self.configured_runtime_url
@@ -189,8 +209,14 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
connect_error: list[Exception] = [] connect_error: list[Exception] = []
async def on_connect_failed(ctrl, exc): async def on_connect_failed(ctrl, exc):
if exc is not None:
self.ap.logger.error(f'Failed to connect to Box runtime ({ws_url}): {exc}')
else:
self.ap.logger.error(f'Failed to connect to Box runtime ({ws_url}), trying to reconnect...')
connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed')) connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed'))
connected.set() connected.set()
if self.runtime_disconnect_callback is not None:
await self.runtime_disconnect_callback(self)
ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed) ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed)
self._ctrl_task = asyncio.create_task( self._ctrl_task = asyncio.create_task(
@@ -225,12 +251,28 @@ class BoxRuntimeConnector(ManagedRuntimeConnector):
if not connected.is_set(): if not connected.is_set():
connect_error.append(exc) connect_error.append(exc)
connected.set() connected.set()
else:
# Connection was established but then dropped — this is a
# disconnect after successful handshake.
if self._uses_websocket():
self.ap.logger.error('Disconnected from Box runtime, trying to reconnect...')
if self.runtime_disconnect_callback is not None:
await self.runtime_disconnect_callback(self)
else:
self.ap.logger.error(
'Disconnected from Box runtime via stdio. '
'Cannot automatically reconnect — please restart LangBot.'
)
return new_connection_callback return new_connection_callback
# -- lifecycle ----------------------------------------------------------- # -- lifecycle -----------------------------------------------------------
def dispose(self) -> None: def dispose(self) -> None:
if self._heartbeat_task is not None:
self._heartbeat_task.cancel()
self._heartbeat_task = None
if self._handler_task is not None: if self._handler_task is not None:
self._handler_task.cancel() self._handler_task.cancel()
self._handler_task = None self._handler_task = None
+14 -1
View File
@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import collections import collections
import datetime as _dt import datetime as _dt
import enum import enum
@@ -52,7 +53,7 @@ class BoxService:
self.ap = ap self.ap = ap
self._runtime_connector: BoxRuntimeConnector | None = None self._runtime_connector: BoxRuntimeConnector | None = None
if client is None: if client is None:
self._runtime_connector = BoxRuntimeConnector(ap) self._runtime_connector = BoxRuntimeConnector(ap, runtime_disconnect_callback=self._on_runtime_disconnect)
client = self._runtime_connector.client client = self._runtime_connector.client
self.client = client self.client = client
self.output_limit_chars = output_limit_chars self.output_limit_chars = output_limit_chars
@@ -82,6 +83,18 @@ class BoxService:
self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}') self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}')
self._available = False self._available = False
async def _on_runtime_disconnect(self, connector: BoxRuntimeConnector) -> None:
"""Called by the connector when the Box runtime connection drops."""
self._available = False
self.ap.logger.warning('Box runtime disconnected, sandbox features temporarily disabled. Reconnecting in 3s...')
await asyncio.sleep(3)
try:
await connector.initialize()
self._available = True
self.ap.logger.info('Box runtime reconnected, sandbox features restored.')
except Exception as exc:
self.ap.logger.warning(f'Box runtime reconnection failed: {exc}. Will retry on next heartbeat disconnect.')
@property @property
def available(self) -> bool: def available(self) -> bool:
return self._available return self._available