From a60aa6f644c981875d41694e9b7584f6251461de Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Wed, 2 Jul 2025 22:20:20 +0800 Subject: [PATCH] feat: runtime reconnecting --- pkg/core/stages/build_app.py | 7 ++++++- pkg/plugin/connector.py | 37 ++++++++++++++++++++++++++++++++---- pkg/plugin/handler.py | 10 ++++++++-- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index 54204c85..f4361008 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from .. import stage, app from ...utils import version, proxy, announce @@ -59,7 +60,11 @@ class BuildAppStage(stage.BootingStage): ap.persistence_mgr = persistence_mgr_inst await persistence_mgr_inst.initialize() - plugin_connector_inst = plugin_connector.PluginRuntimeConnector(ap) + async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None: + await asyncio.sleep(3) + await plugin_connector_inst.initialize() + + plugin_connector_inst = plugin_connector.PluginRuntimeConnector(ap, runtime_disconnect_callback) await plugin_connector_inst.initialize() ap.plugin_connector = plugin_connector_inst diff --git a/pkg/plugin/connector.py b/pkg/plugin/connector.py index 302ee600..7b1bcdc2 100644 --- a/pkg/plugin/connector.py +++ b/pkg/plugin/connector.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio +import typing import os import sys @@ -9,9 +10,9 @@ from ..core import app from . import handler from ..utils import platform from langbot_plugin.runtime.io.controllers.stdio import client as stdio_client_controller -from langbot_plugin.runtime.io.connections import stdio as stdio_connection from langbot_plugin.runtime.io.controllers.ws import client as ws_client_controller from langbot_plugin.api.entities import events, context +import langbot_plugin.runtime.io.connection as base_connection class PluginRuntimeConnector: @@ -25,12 +26,34 @@ class PluginRuntimeConnector: stdio_client_controller: stdio_client_controller.StdioClientController - def __init__(self, ap: app.Application): + runtime_disconnect_callback: typing.Callable[ + [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] + ] + + def __init__( + self, + ap: app.Application, + runtime_disconnect_callback: typing.Callable[ + [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] + ], + ): self.ap = ap + self.runtime_disconnect_callback = runtime_disconnect_callback async def initialize(self): - async def new_connection_callback(connection: stdio_connection.StdioConnection): - self.handler = handler.RuntimeConnectionHandler(connection, self.ap) + async def new_connection_callback(connection: base_connection.Connection): + async def disconnect_callback(rchandler: handler.RuntimeConnectionHandler) -> bool: + if platform.get_platform() == 'docker': + self.ap.logger.error('Disconnected from plugin runtime, trying to reconnect...') + await self.runtime_disconnect_callback(self) + return False + else: + self.ap.logger.error( + 'Disconnected from plugin runtime, cannot automatically reconnect while LangBot connects to plugin runtime via stdio, please restart LangBot.' + ) + return False + + self.handler = handler.RuntimeConnectionHandler(connection, disconnect_callback, self.ap) self.handler_task = asyncio.create_task(self.handler.run()) _ = await self.handler.ping() self.ap.logger.info('Connected to plugin runtime.') @@ -41,8 +64,14 @@ class PluginRuntimeConnector: if platform.get_platform() == 'docker': # use websocket self.ap.logger.info('use websocket to connect to plugin runtime') ws_url = self.ap.instance_config.data['plugin']['runtime_ws_url'] + + async def make_connection_failed_callback(ctrl: ws_client_controller.WebSocketClientController) -> None: + self.ap.logger.error('Failed to connect to plugin runtime, trying to reconnect...') + await self.runtime_disconnect_callback(self) + ctrl = ws_client_controller.WebSocketClientController( ws_url=ws_url, + make_connection_failed_callback=make_connection_failed_callback, ) task = ctrl.run(new_connection_callback) else: # stdio diff --git a/pkg/plugin/handler.py b/pkg/plugin/handler.py index 42734261..afa0bbbc 100644 --- a/pkg/plugin/handler.py +++ b/pkg/plugin/handler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import typing from typing import Any import sqlalchemy @@ -22,8 +23,13 @@ class RuntimeConnectionHandler(handler.Handler): ap: app.Application - def __init__(self, connection: Connection, ap: app.Application): - super().__init__(connection) + def __init__( + self, + connection: Connection, + disconnect_callback: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, bool]], + ap: app.Application, + ): + super().__init__(connection, disconnect_callback) self.ap = ap @self.action(RuntimeToLangBotAction.GET_PLUGIN_SETTINGS)