From 2d06f1cadb30b82808bd12a3ce0f7012c7fe5649 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 9 Jun 2025 12:10:48 +0800 Subject: [PATCH] feat: connector for plugin runtime --- pkg/core/app.py | 5 ++ pkg/core/stages/build_app.py | 5 ++ pkg/persistence/mgr.py | 64 +++++++++---------- .../migrations/dbm004_plugin_config.py | 20 ++++++ pkg/plugin/connector.py | 57 +++++++++++++++++ pkg/plugin/handler.py | 7 ++ pkg/utils/constants.py | 2 +- pyproject.toml | 2 +- templates/config.yaml | 2 + 9 files changed, 130 insertions(+), 34 deletions(-) create mode 100644 pkg/persistence/migrations/dbm004_plugin_config.py create mode 100644 pkg/plugin/connector.py create mode 100644 pkg/plugin/handler.py diff --git a/pkg/core/app.py b/pkg/core/app.py index 911acd3d..b16bab95 100644 --- a/pkg/core/app.py +++ b/pkg/core/app.py @@ -13,6 +13,7 @@ from ..provider.tools import toolmgr as llm_tool_mgr from ..config import manager as config_mgr from ..command import cmdmgr from ..plugin import manager as plugin_mgr +from ..plugin import connector as plugin_connector from ..pipeline import pool from ..pipeline import controller, pipelinemgr from ..utils import version as version_mgr, proxy as proxy_mgr, announce as announce_mgr @@ -77,6 +78,8 @@ class Application: plugin_mgr: plugin_mgr.PluginManager = None + plugin_connector: plugin_connector.PluginRuntimeConnector = None + query_pool: pool.QueryPool = None ctrl: controller.Controller = None @@ -117,6 +120,8 @@ class Application: async def run(self): try: + await self.plugin_connector.initialize_plugins() + await self.plugin_mgr.initialize_plugins() # 后续可能会允许动态重启其他任务 diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index 6ee35610..d8689789 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -5,6 +5,7 @@ from .. import stage, app from ...utils import version, proxy, announce from ...pipeline import pool, controller, pipelinemgr from ...plugin import manager as plugin_mgr +from ...plugin import connector as plugin_connector from ...command import cmdmgr from ...provider.session import sessionmgr as llm_session_mgr from ...provider.modelmgr import modelmgr as llm_model_mgr @@ -64,6 +65,10 @@ class BuildAppStage(stage.BootingStage): ap.plugin_mgr = plugin_mgr_inst await plugin_mgr_inst.load_plugins() + plugin_connector_inst = plugin_connector.PluginRuntimeConnector(ap) + await plugin_connector_inst.initialize() + ap.plugin_connector = plugin_connector_inst + cmd_mgr_inst = cmdmgr.CommandManager(ap) await cmd_mgr_inst.initialize() ap.cmd_mgr = cmd_mgr_inst diff --git a/pkg/persistence/mgr.py b/pkg/persistence/mgr.py index 606aa9fd..23f08aa7 100644 --- a/pkg/persistence/mgr.py +++ b/pkg/persistence/mgr.py @@ -44,6 +44,38 @@ class PersistenceManager: await self.create_tables() + # run migrations + database_version = await self.execute_async( + sqlalchemy.select(metadata.Metadata).where(metadata.Metadata.key == 'database_version') + ) + + database_version = int(database_version.fetchone()[1]) + required_database_version = constants.required_database_version + + if database_version < required_database_version: + migrations = migration.preregistered_db_migrations + migrations.sort(key=lambda x: x.number) + + last_migration_number = database_version + + for migration_cls in migrations: + migration_instance = migration_cls(self.ap) + + if ( + migration_instance.number > database_version + and migration_instance.number <= required_database_version + ): + await migration_instance.upgrade() + await self.execute_async( + sqlalchemy.update(metadata.Metadata) + .where(metadata.Metadata.key == 'database_version') + .values({'value': str(migration_instance.number)}) + ) + last_migration_number = migration_instance.number + self.ap.logger.info(f'Migration {migration_instance.number} completed.') + + self.ap.logger.info(f'Successfully upgraded database to version {last_migration_number}.') + async def create_tables(self): # create tables async with self.get_db_engine().connect() as conn: @@ -87,38 +119,6 @@ class PersistenceManager: # ================================= - # run migrations - database_version = await self.execute_async( - sqlalchemy.select(metadata.Metadata).where(metadata.Metadata.key == 'database_version') - ) - - database_version = int(database_version.fetchone()[1]) - required_database_version = constants.required_database_version - - if database_version < required_database_version: - migrations = migration.preregistered_db_migrations - migrations.sort(key=lambda x: x.number) - - last_migration_number = database_version - - for migration_cls in migrations: - migration_instance = migration_cls(self.ap) - - if ( - migration_instance.number > database_version - and migration_instance.number <= required_database_version - ): - await migration_instance.upgrade() - await self.execute_async( - sqlalchemy.update(metadata.Metadata) - .where(metadata.Metadata.key == 'database_version') - .values({'value': str(migration_instance.number)}) - ) - last_migration_number = migration_instance.number - self.ap.logger.info(f'Migration {migration_instance.number} completed.') - - self.ap.logger.info(f'Successfully upgraded database to version {last_migration_number}.') - async def execute_async(self, *args, **kwargs) -> sqlalchemy.engine.cursor.CursorResult: async with self.get_db_engine().connect() as conn: result = await conn.execute(*args, **kwargs) diff --git a/pkg/persistence/migrations/dbm004_plugin_config.py b/pkg/persistence/migrations/dbm004_plugin_config.py new file mode 100644 index 00000000..fc7a175a --- /dev/null +++ b/pkg/persistence/migrations/dbm004_plugin_config.py @@ -0,0 +1,20 @@ +from .. import migration + + +@migration.migration_class(4) +class DBMigratePluginConfig(migration.DBMigration): + """插件配置""" + + async def upgrade(self): + """升级""" + + if 'plugin' not in self.ap.instance_config.data: + self.ap.instance_config.data['plugin'] = { + 'runtime_ws_url': 'ws://localhost:5400/control/ws', + } + + await self.ap.instance_config.dump_config() + + async def downgrade(self): + """降级""" + pass diff --git a/pkg/plugin/connector.py b/pkg/plugin/connector.py new file mode 100644 index 00000000..5eadc900 --- /dev/null +++ b/pkg/plugin/connector.py @@ -0,0 +1,57 @@ +# For connect to plugin runtime. +from __future__ import annotations + +import asyncio +import os +import sys + +from ..core import app +from . import handler +from ..utils import platform +from langbot_plugin.runtime.io.controllers.stdio import client as stdio_client_controller +from langbot_plugin.runtime.io.connections import stdio as stdio_connection +from langbot_plugin.runtime.io.controllers.ws import client as ws_client_controller + + +class PluginRuntimeConnector: + """Plugin runtime connector""" + + ap: app.Application + + handler: handler.RuntimeConnectionHandler + + handler_task: asyncio.Task + + stdio_client_controller: stdio_client_controller.StdioClientController + + def __init__(self, ap: app.Application): + self.ap = ap + + async def initialize(self): + async def new_connection_callback(connection: stdio_connection.StdioConnection): + self.ap.logger.info('Connected to plugin runtime.') + self.handler = handler.RuntimeConnectionHandler(connection) + self.handler_task = asyncio.create_task(self.handler.run()) + + if platform.get_platform() == 'docker': # use websocket + ws_url = self.ap.instance_config.data['plugin']['runtime_ws_url'] + ctrl = ws_client_controller.WebSocketClientController( + ws_url=ws_url, + ) + await ctrl.run(new_connection_callback) + else: # stdio + # cmd: lbp rt -s + python_path = sys.executable + env = os.environ.copy() + ctrl = stdio_client_controller.StdioClientController( + command=python_path, + args=['-m', 'langbot_plugin.cli.__init__', 'rt', '-s'], + env=env, + ) + await ctrl.run(new_connection_callback) + + async def run(self): + pass + + async def initialize_plugins(self): + pass diff --git a/pkg/plugin/handler.py b/pkg/plugin/handler.py new file mode 100644 index 00000000..077c1649 --- /dev/null +++ b/pkg/plugin/handler.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from langbot_plugin.runtime.io import handler + + +class RuntimeConnectionHandler(handler.Handler): + """Runtime connection handler""" diff --git a/pkg/utils/constants.py b/pkg/utils/constants.py index 450a2e16..f822b477 100644 --- a/pkg/utils/constants.py +++ b/pkg/utils/constants.py @@ -1,6 +1,6 @@ semantic_version = 'v4.0.7' -required_database_version = 3 +required_database_version = 4 """标记本版本所需要的数据库结构版本,用于判断数据库迁移""" debug_mode = False diff --git a/pyproject.toml b/pyproject.toml index e33bf4af..40f172c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dependencies = [ "pre-commit>=4.2.0", "uv>=0.7.11", "mypy>=1.16.0", - "langbot-plugin>=0.1.0a4", + "langbot-plugin==0.1.0a6", ] keywords = [ "bot", diff --git a/templates/config.yaml b/templates/config.yaml index 109cd8d7..c896417c 100644 --- a/templates/config.yaml +++ b/templates/config.yaml @@ -18,3 +18,5 @@ system: jwt: expire: 604800 secret: '' +plugin: + runtime_ws_url: 'ws://plugin-runtime:5400/control/ws' \ No newline at end of file