mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
feat(box): add --standalone-box flag and 3-way transport decision for Box runtime
Align Box runtime connection logic with Plugin runtime's pattern: - Docker: WebSocket to langbot_box container (ws://langbot_box:5411) - --standalone-box: WebSocket to external Box process (ws://localhost:5411) - Windows: subprocess + WebSocket (workaround for async stdio limitation) - Unix/macOS: subprocess + stdio pipe (unchanged) BoxRuntimeConnector now inherits ManagedRuntimeConnector for subprocess lifecycle reuse. Add langbot_box service to docker-compose.yaml.
This commit is contained in:
@@ -18,16 +18,28 @@ services:
|
||||
networks:
|
||||
- langbot_network
|
||||
|
||||
langbot_box:
|
||||
image: rockchin/langbot:latest
|
||||
container_name: langbot_box
|
||||
volumes:
|
||||
- ./data/box:/workspaces
|
||||
# Mount container runtime socket for Box sandbox backend.
|
||||
# Uncomment the one that matches your container runtime:
|
||||
# - /var/run/podman/podman.sock:/var/run/podman/podman.sock # Podman
|
||||
- /var/run/docker.sock:/var/run/docker.sock # Docker
|
||||
restart: on-failure
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
command: ["uv", "run", "--no-sync", "-m", "langbot_plugin.box", "--mode", "ws"]
|
||||
networks:
|
||||
- langbot_network
|
||||
|
||||
langbot:
|
||||
image: rockchin/langbot:latest
|
||||
container_name: langbot
|
||||
volumes:
|
||||
- ./data:/app/data
|
||||
- ./data/box:/workspaces
|
||||
# Mount container runtime socket for Box sandbox (Docker backend).
|
||||
# Uncomment the one that matches your container runtime:
|
||||
# - /var/run/podman/podman.sock:/var/run/podman/podman.sock # Podman
|
||||
- /var/run/docker.sock:/var/run/docker.sock # Docker
|
||||
restart: on-failure
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
|
||||
@@ -29,6 +29,12 @@ async def main_entry(loop: asyncio.AbstractEventLoop):
|
||||
help='Use standalone plugin runtime / 使用独立插件运行时',
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--standalone-box',
|
||||
action='store_true',
|
||||
help='Use standalone box runtime / 使用独立 Box 运行时',
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument('--debug', action='store_true', help='Debug mode / 调试模式', default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -37,6 +43,11 @@ async def main_entry(loop: asyncio.AbstractEventLoop):
|
||||
|
||||
platform.standalone_runtime = True
|
||||
|
||||
if args.standalone_box:
|
||||
from langbot.pkg.utils import platform
|
||||
|
||||
platform.standalone_box = True
|
||||
|
||||
if args.debug:
|
||||
from langbot.pkg.utils import constants
|
||||
|
||||
|
||||
@@ -13,10 +13,19 @@ from langbot_plugin.runtime.io.connection import Connection
|
||||
from langbot_plugin.box.client import ActionRPCBoxClient
|
||||
from langbot_plugin.box.errors import BoxRuntimeUnavailableError
|
||||
|
||||
from ..utils import platform
|
||||
from ..utils.managed_runtime import ManagedRuntimeConnector
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..core import app as core_app
|
||||
|
||||
|
||||
# Default Docker Compose service name for the standalone Box container.
|
||||
_DOCKER_BOX_HOST = 'langbot_box'
|
||||
_DEFAULT_RELAY_PORT = 5410
|
||||
_DEFAULT_RPC_PORT = 5411 # relay_port + 1
|
||||
|
||||
|
||||
def _get_box_config(ap) -> dict:
|
||||
"""Return the 'box' section from instance config, with safe fallbacks."""
|
||||
instance_config = getattr(ap, 'instance_config', None)
|
||||
@@ -25,39 +34,175 @@ def _get_box_config(ap) -> dict:
|
||||
|
||||
|
||||
def resolve_box_ws_relay_url(ap: core_app.Application) -> str:
|
||||
"""Derive the ws relay base URL used for managed-process attach."""
|
||||
runtime_url = str(_get_box_config(ap).get('runtime_url', '')).strip()
|
||||
"""Derive the WS relay base URL used for managed-process attach.
|
||||
|
||||
The WS relay serves the ``/v1/sessions/{id}/managed-process/ws`` endpoint
|
||||
on the *relay* port (default 5410).
|
||||
"""
|
||||
box_cfg = _get_box_config(ap)
|
||||
|
||||
# Explicit relay URL takes precedence.
|
||||
runtime_url = str(box_cfg.get('runtime_url', '')).strip()
|
||||
if runtime_url:
|
||||
return runtime_url
|
||||
|
||||
return 'http://127.0.0.1:5410'
|
||||
# In Docker, relay lives on the box runtime container.
|
||||
if platform.get_platform() == 'docker':
|
||||
return f'http://{_DOCKER_BOX_HOST}:{_DEFAULT_RELAY_PORT}'
|
||||
|
||||
return f'http://127.0.0.1:{_DEFAULT_RELAY_PORT}'
|
||||
|
||||
|
||||
class BoxRuntimeConnector:
|
||||
"""Connect to the Box runtime via action RPC (stdio or ws)."""
|
||||
class BoxRuntimeConnector(ManagedRuntimeConnector):
|
||||
"""Connect to the Box runtime via action RPC.
|
||||
|
||||
Transport decision (mirrors Plugin runtime logic):
|
||||
1. Docker / --standalone-box / explicit runtime_url -> WebSocket to external Box process
|
||||
2. Windows (non-Docker) -> subprocess + WebSocket (Windows lacks async stdio pipe)
|
||||
3. Unix / macOS -> subprocess + stdio pipe
|
||||
"""
|
||||
|
||||
def __init__(self, ap: core_app.Application):
|
||||
self.ap = ap
|
||||
super().__init__(ap)
|
||||
self.configured_runtime_url = self._load_configured_runtime_url()
|
||||
self.manages_local_runtime = self._should_manage_local_runtime()
|
||||
self.ws_relay_base_url = resolve_box_ws_relay_url(ap)
|
||||
self.client = ActionRPCBoxClient(logger=ap.logger)
|
||||
|
||||
self._handler: Handler | None = None
|
||||
self._handler_task: asyncio.Task | None = None
|
||||
self._ctrl_task: asyncio.Task | None = None
|
||||
self._subprocess: asyncio.subprocess.Process | None = None
|
||||
|
||||
# Parse the relay URL once for reuse
|
||||
# Parse the relay URL once for reuse (relay port, not RPC port).
|
||||
parsed = urlparse(self.ws_relay_base_url)
|
||||
self._relay_host = parsed.hostname or '127.0.0.1'
|
||||
self._relay_port = parsed.port or 5410
|
||||
self._relay_port = parsed.port or _DEFAULT_RELAY_PORT
|
||||
|
||||
def _uses_websocket(self) -> bool:
|
||||
"""Whether the connector should use WebSocket to reach the Box runtime.
|
||||
|
||||
True when:
|
||||
- Running inside Docker (Box runtime is a separate container)
|
||||
- The ``--standalone-box`` CLI flag was passed
|
||||
- An explicit ``runtime_url`` was configured
|
||||
"""
|
||||
return bool(
|
||||
self.configured_runtime_url
|
||||
or platform.get_platform() == 'docker'
|
||||
or platform.use_websocket_to_connect_box_runtime()
|
||||
)
|
||||
|
||||
async def initialize(self) -> None:
|
||||
if self.manages_local_runtime:
|
||||
await self._start_local_stdio()
|
||||
if self._uses_websocket():
|
||||
if platform.get_platform() == 'win32' and not self.configured_runtime_url:
|
||||
# Windows without an explicit URL: launch the box server as a
|
||||
# detached subprocess (no stdio pipe) then talk to it via WS,
|
||||
# because Windows ProactorEventLoop does not support async
|
||||
# subprocess stdin/stdout pipes.
|
||||
await self._start_subprocess_then_ws()
|
||||
else:
|
||||
await self._connect_remote_ws()
|
||||
else:
|
||||
await self._connect_remote_ws()
|
||||
await self._start_local_stdio()
|
||||
|
||||
# -- transport paths -----------------------------------------------------
|
||||
|
||||
async def _start_local_stdio(self) -> None:
|
||||
"""Launch box server as subprocess and connect via stdio (Unix/macOS)."""
|
||||
from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController
|
||||
|
||||
self.ap.logger.info('Use stdio to connect to box runtime')
|
||||
python_path = sys.executable
|
||||
env = os.environ.copy()
|
||||
|
||||
connected = asyncio.Event()
|
||||
connect_error: list[Exception] = []
|
||||
|
||||
ctrl = StdioClientController(
|
||||
command=python_path,
|
||||
args=['-m', 'langbot_plugin.box.server', '--port', str(self._relay_port)],
|
||||
env=env,
|
||||
)
|
||||
self._ctrl_task = asyncio.create_task(
|
||||
ctrl.run(self._make_connection_callback('stdio', connected, connect_error))
|
||||
)
|
||||
|
||||
# Wait for connection or failure
|
||||
try:
|
||||
await asyncio.wait_for(connected.wait(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
raise BoxRuntimeUnavailableError('box runtime subprocess did not connect in time')
|
||||
|
||||
if connect_error:
|
||||
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
|
||||
|
||||
# Store subprocess reference for dispose (StdioClientController manages it)
|
||||
self._subprocess = ctrl.process
|
||||
|
||||
async def _start_subprocess_then_ws(self) -> None:
|
||||
"""Launch box server as detached subprocess, then connect via WS (Windows)."""
|
||||
self.ap.logger.info('(windows) Use cmd to launch box runtime and communicate via ws')
|
||||
|
||||
# Launch the box server subprocess (no stdio pipe).
|
||||
# The server will listen on _relay_port for the WS relay and
|
||||
# _relay_port+1 for action-RPC WebSocket.
|
||||
await self._start_runtime_subprocess(
|
||||
'-m',
|
||||
'langbot_plugin.box.server',
|
||||
'--port',
|
||||
str(self._relay_port),
|
||||
)
|
||||
|
||||
# Wait for the WS endpoint to become reachable, then connect.
|
||||
ws_url = f'ws://localhost:{self._relay_port + 1}'
|
||||
await self._connect_ws(ws_url, '(windows) WebSocket')
|
||||
|
||||
async def _connect_remote_ws(self) -> None:
|
||||
"""Connect to a remote (or Docker) box server via WebSocket."""
|
||||
ws_url = self._resolve_rpc_ws_url()
|
||||
self.ap.logger.info(f'Use WebSocket to connect to box runtime ({ws_url})')
|
||||
await self._connect_ws(ws_url, 'WebSocket')
|
||||
|
||||
# -- helpers -------------------------------------------------------------
|
||||
|
||||
def _resolve_rpc_ws_url(self) -> str:
|
||||
"""Determine the action-RPC WebSocket URL.
|
||||
|
||||
Priority:
|
||||
1. Explicit ``box.runtime_url`` from config (user-supplied, used as-is)
|
||||
2. Docker environment -> ``ws://langbot_box_runtime:5411``
|
||||
3. --standalone-box / Windows fallback -> ``ws://localhost:5411``
|
||||
"""
|
||||
if self.configured_runtime_url:
|
||||
return self.configured_runtime_url
|
||||
|
||||
if platform.get_platform() == 'docker':
|
||||
return f'ws://{_DOCKER_BOX_HOST}:{_DEFAULT_RPC_PORT}'
|
||||
|
||||
return f'ws://localhost:{self._relay_port + 1}'
|
||||
|
||||
async def _connect_ws(self, ws_url: str, transport_name: str) -> None:
|
||||
"""Shared WebSocket connection procedure."""
|
||||
from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController
|
||||
|
||||
connected = asyncio.Event()
|
||||
connect_error: list[Exception] = []
|
||||
|
||||
async def on_connect_failed(ctrl, exc):
|
||||
connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed'))
|
||||
connected.set()
|
||||
|
||||
ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed)
|
||||
self._ctrl_task = asyncio.create_task(
|
||||
ctrl.run(self._make_connection_callback(transport_name, connected, connect_error))
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(connected.wait(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
raise BoxRuntimeUnavailableError(f'box runtime ws connection timed out ({ws_url})')
|
||||
|
||||
if connect_error:
|
||||
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
|
||||
|
||||
def _make_connection_callback(
|
||||
self,
|
||||
@@ -82,63 +227,7 @@ class BoxRuntimeConnector:
|
||||
|
||||
return new_connection_callback
|
||||
|
||||
async def _start_local_stdio(self) -> None:
|
||||
"""Launch box server as subprocess and connect via stdio."""
|
||||
from langbot_plugin.runtime.io.controllers.stdio.client import StdioClientController
|
||||
|
||||
python_path = sys.executable
|
||||
env = os.environ.copy()
|
||||
|
||||
connected = asyncio.Event()
|
||||
connect_error: list[Exception] = []
|
||||
|
||||
ctrl = StdioClientController(
|
||||
command=python_path,
|
||||
args=['-m', 'langbot_plugin.box.server', '--port', str(self._relay_port)],
|
||||
env=env,
|
||||
)
|
||||
self._subprocess = None # StdioClientController manages the subprocess
|
||||
self._ctrl_task = asyncio.create_task(
|
||||
ctrl.run(self._make_connection_callback('stdio', connected, connect_error))
|
||||
)
|
||||
|
||||
# Wait for connection or failure
|
||||
try:
|
||||
await asyncio.wait_for(connected.wait(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
raise BoxRuntimeUnavailableError('box runtime subprocess did not connect in time')
|
||||
|
||||
if connect_error:
|
||||
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
|
||||
|
||||
# Store subprocess reference for dispose
|
||||
self._subprocess = ctrl.process
|
||||
|
||||
async def _connect_remote_ws(self) -> None:
|
||||
"""Connect to a remote box server via WebSocket."""
|
||||
from langbot_plugin.runtime.io.controllers.ws.client import WebSocketClientController
|
||||
|
||||
ws_url = f'ws://{self._relay_host}:{self._relay_port + 1}'
|
||||
|
||||
connected = asyncio.Event()
|
||||
connect_error: list[Exception] = []
|
||||
|
||||
async def on_connect_failed(ctrl, exc):
|
||||
connect_error.append(exc or BoxRuntimeUnavailableError('ws connection failed'))
|
||||
connected.set()
|
||||
|
||||
ctrl = WebSocketClientController(ws_url=ws_url, make_connection_failed_callback=on_connect_failed)
|
||||
self._ctrl_task = asyncio.create_task(
|
||||
ctrl.run(self._make_connection_callback('WebSocket', connected, connect_error))
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(connected.wait(), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
raise BoxRuntimeUnavailableError('box runtime ws connection timed out')
|
||||
|
||||
if connect_error:
|
||||
raise BoxRuntimeUnavailableError(f'box runtime connection failed: {connect_error[0]}')
|
||||
# -- lifecycle -----------------------------------------------------------
|
||||
|
||||
def dispose(self) -> None:
|
||||
if self._handler_task is not None:
|
||||
@@ -149,12 +238,15 @@ class BoxRuntimeConnector:
|
||||
self._ctrl_task.cancel()
|
||||
self._ctrl_task = None
|
||||
|
||||
if self._subprocess is not None and self._subprocess.returncode is None:
|
||||
# stdio-managed subprocess (stored as self._subprocess by _start_local_stdio)
|
||||
if hasattr(self, '_subprocess') and self._subprocess is not None and self._subprocess.returncode is None:
|
||||
self.ap.logger.info('Terminating managed box runtime process...')
|
||||
self._subprocess.terminate()
|
||||
|
||||
# Subprocess launched by ManagedRuntimeConnector._start_runtime_subprocess (Windows path)
|
||||
self._dispose_subprocess()
|
||||
|
||||
# -- config helpers ------------------------------------------------------
|
||||
|
||||
def _load_configured_runtime_url(self) -> str:
|
||||
return str(_get_box_config(self.ap).get('runtime_url', '')).strip()
|
||||
|
||||
def _should_manage_local_runtime(self) -> bool:
|
||||
return not self.configured_runtime_url
|
||||
|
||||
@@ -16,7 +16,14 @@ def get_platform() -> str:
|
||||
|
||||
standalone_runtime = False
|
||||
|
||||
standalone_box = False
|
||||
|
||||
|
||||
def use_websocket_to_connect_plugin_runtime() -> bool:
|
||||
"""是否使用 websocket 连接插件运行时"""
|
||||
return standalone_runtime
|
||||
|
||||
|
||||
def use_websocket_to_connect_box_runtime() -> bool:
|
||||
"""Whether to use WebSocket to connect to an external box runtime."""
|
||||
return standalone_box
|
||||
|
||||
@@ -89,7 +89,7 @@ monitoring:
|
||||
check_interval_hours: 1
|
||||
box:
|
||||
profile: 'default'
|
||||
runtime_url: '' # Leave empty to auto-launch local Box Runtime subprocess. Set to a URL only for remote/external deployment.
|
||||
runtime_url: '' # Action-RPC WebSocket URL of an external Box Runtime. Leave empty for auto-detection (stdio locally, Docker service in containers).
|
||||
shared_host_root: './data/box' # For Docker deployment, use '/workspaces'
|
||||
default_host_workspace: '' # Defaults to '<shared_host_root>/default'
|
||||
allowed_host_mount_roots: # Defaults to ['<shared_host_root>'] when left empty
|
||||
|
||||
@@ -25,41 +25,64 @@ def make_app(logger: Mock, runtime_url: str = ''):
|
||||
)
|
||||
|
||||
|
||||
def test_box_runtime_connector_manages_local_when_no_url():
|
||||
def test_box_runtime_connector_stdio_when_no_url(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Without runtime_url, on a non-Docker Unix platform, use stdio."""
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
connector = BoxRuntimeConnector(make_app(Mock()))
|
||||
|
||||
assert connector.manages_local_runtime is True
|
||||
assert connector._uses_websocket() is False
|
||||
assert isinstance(connector.client, ActionRPCBoxClient)
|
||||
|
||||
|
||||
def test_box_runtime_connector_remote_when_url_configured():
|
||||
def test_box_runtime_connector_ws_when_url_configured(monkeypatch: pytest.MonkeyPatch):
|
||||
"""With an explicit runtime_url, always use WebSocket."""
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
logger = Mock()
|
||||
connector = BoxRuntimeConnector(make_app(logger, runtime_url='http://box-runtime:5410'))
|
||||
|
||||
assert connector.manages_local_runtime is False
|
||||
assert connector._uses_websocket() is True
|
||||
assert isinstance(connector.client, ActionRPCBoxClient)
|
||||
|
||||
|
||||
def test_box_runtime_connector_manages_local_in_docker(monkeypatch: pytest.MonkeyPatch):
|
||||
def test_box_runtime_connector_ws_in_docker(monkeypatch: pytest.MonkeyPatch):
|
||||
"""Inside Docker (no explicit URL), use WebSocket to reach a sibling container."""
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'docker')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
connector = BoxRuntimeConnector(make_app(Mock()))
|
||||
|
||||
assert connector.manages_local_runtime is True
|
||||
assert connector.ws_relay_base_url == 'http://127.0.0.1:5410'
|
||||
assert connector._uses_websocket() is True
|
||||
assert connector.ws_relay_base_url == 'http://langbot_box:5410'
|
||||
|
||||
|
||||
def test_box_runtime_connector_ws_relay_url_default():
|
||||
def test_box_runtime_connector_ws_with_standalone_flag(monkeypatch: pytest.MonkeyPatch):
|
||||
"""With --standalone-box flag, use WebSocket even on a local Unix platform."""
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', True)
|
||||
connector = BoxRuntimeConnector(make_app(Mock()))
|
||||
|
||||
assert connector._uses_websocket() is True
|
||||
|
||||
|
||||
def test_box_runtime_connector_ws_relay_url_default(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
connector = BoxRuntimeConnector(make_app(Mock()))
|
||||
|
||||
assert connector.ws_relay_base_url == 'http://127.0.0.1:5410'
|
||||
|
||||
|
||||
def test_box_runtime_connector_ws_relay_url_explicit():
|
||||
def test_box_runtime_connector_ws_relay_url_explicit(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
connector = BoxRuntimeConnector(make_app(Mock(), runtime_url='http://box-runtime:5410'))
|
||||
assert connector.ws_relay_base_url == 'http://box-runtime:5410'
|
||||
|
||||
|
||||
def test_box_runtime_connector_dispose_terminates_subprocess():
|
||||
def test_box_runtime_connector_dispose_terminates_subprocess(monkeypatch: pytest.MonkeyPatch):
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.get_platform', lambda: 'linux')
|
||||
monkeypatch.setattr('langbot.pkg.utils.platform.standalone_box', False)
|
||||
logger = Mock()
|
||||
connector = BoxRuntimeConnector(make_app(logger))
|
||||
subprocess = Mock()
|
||||
|
||||
Reference in New Issue
Block a user