# For connect to plugin runtime. from __future__ import annotations import asyncio import io import time import zipfile from typing import Any import typing import os import sys import httpx import sqlalchemy import yaml from async_lru import alru_cache from langbot_plugin.api.entities.builtin.pipeline.query import provider_session from ..core import app from . import handler from ..utils import platform from langbot_plugin.runtime.io.controllers.stdio import ( client as stdio_client_controller, ) from langbot_plugin.runtime.io.controllers.ws import client as ws_client_controller from langbot_plugin.api.entities import events from langbot_plugin.api.entities import context import langbot_plugin.runtime.io.connection as base_connection from langbot_plugin.api.definition.components.manifest import ComponentManifest from langbot_plugin.api.entities.builtin.command import ( context as command_context, errors as command_errors, ) from langbot_plugin.runtime.plugin.mgr import PluginInstallSource from ..core import taskmgr from ..entity.persistence import plugin as persistence_plugin class PluginRuntimeNotConnectedError(RuntimeError): """Raised when plugin runtime operations are requested before connection.""" class PluginRuntimeConnector: """Plugin runtime connector""" ap: app.Application handler: handler.RuntimeConnectionHandler handler_task: asyncio.Task heartbeat_task: asyncio.Task | None = None stdio_client_controller: stdio_client_controller.StdioClientController ctrl: stdio_client_controller.StdioClientController | ws_client_controller.WebSocketClientController runtime_subprocess_on_windows: asyncio.subprocess.Process | None = None runtime_subprocess_on_windows_task: asyncio.Task | None = None runtime_disconnect_callback: typing.Callable[ [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] ] is_enable_plugin: bool = True """Mark if the plugin system is enabled""" def __init__( self, ap: app.Application, runtime_disconnect_callback: typing.Callable[ [PluginRuntimeConnector], typing.Coroutine[typing.Any, typing.Any, None] ], ): self.ap = ap self.runtime_disconnect_callback = runtime_disconnect_callback self.is_enable_plugin = self.ap.instance_config.data.get('plugin', {}).get('enable', True) async def heartbeat_loop(self): while True: await asyncio.sleep(20) try: await self.ping_plugin_runtime() self.ap.logger.debug('Heartbeat to plugin runtime success.') except Exception as e: self.ap.logger.debug(f'Failed to heartbeat to plugin runtime: {e}') async def initialize(self): if not self.is_enable_plugin: self.ap.logger.info('Plugin system is disabled.') return async def new_connection_callback(connection: base_connection.Connection): async def disconnect_callback( rchandler: handler.RuntimeConnectionHandler, ) -> bool: if platform.get_platform() == 'docker' or platform.use_websocket_to_connect_plugin_runtime(): self.ap.logger.error('Disconnected from plugin runtime, trying to reconnect...') await self.runtime_disconnect_callback(self) return False else: self.ap.logger.error( 'Disconnected from plugin runtime, cannot automatically reconnect while LangBot connects to plugin runtime via stdio, please restart LangBot.' ) return False self.handler = handler.RuntimeConnectionHandler(connection, disconnect_callback, self.ap) self.handler_task = asyncio.create_task(self.handler.run()) _ = await self.handler.ping() self.ap.logger.info('Connected to plugin runtime.') await self.handler_task task: asyncio.Task | None = None if platform.get_platform() == 'docker' or platform.use_websocket_to_connect_plugin_runtime(): # use websocket self.ap.logger.info('use websocket to connect to plugin runtime') ws_url = self.ap.instance_config.data.get('plugin', {}).get( 'runtime_ws_url', 'ws://langbot_plugin_runtime:5400/control/ws' ) async def make_connection_failed_callback( ctrl: ws_client_controller.WebSocketClientController, exc: Exception = None, ) -> None: if exc is not None: self.ap.logger.error(f'Failed to connect to plugin runtime({ws_url}): {exc}') else: self.ap.logger.error(f'Failed to connect to plugin runtime({ws_url}), trying to reconnect...') await self.runtime_disconnect_callback(self) self.ctrl = ws_client_controller.WebSocketClientController( ws_url=ws_url, make_connection_failed_callback=make_connection_failed_callback, ) task = self.ctrl.run(new_connection_callback) elif platform.get_platform() == 'win32': # Due to Windows's lack of supports for both stdio and subprocess: # See also: https://docs.python.org/zh-cn/3.13/library/asyncio-platforms.html # We have to launch runtime via cmd but communicate via ws. self.ap.logger.info('(windows) use cmd to launch plugin runtime and communicate via ws') if self.runtime_subprocess_on_windows is None: # only launch once python_path = sys.executable env = os.environ.copy() self.runtime_subprocess_on_windows = await asyncio.create_subprocess_exec( python_path, '-m', 'langbot_plugin.cli.__init__', 'rt', env=env, ) # hold the process self.runtime_subprocess_on_windows_task = asyncio.create_task(self.runtime_subprocess_on_windows.wait()) ws_url = 'ws://localhost:5400/control/ws' async def make_connection_failed_callback( ctrl: ws_client_controller.WebSocketClientController, exc: Exception = None, ) -> None: if exc is not None: self.ap.logger.error(f'(windows) Failed to connect to plugin runtime({ws_url}): {exc}') else: self.ap.logger.error( f'(windows) Failed to connect to plugin runtime({ws_url}), trying to reconnect...' ) await self.runtime_disconnect_callback(self) self.ctrl = ws_client_controller.WebSocketClientController( ws_url=ws_url, make_connection_failed_callback=make_connection_failed_callback, ) task = self.ctrl.run(new_connection_callback) else: # stdio self.ap.logger.info('use stdio to connect to plugin runtime') # cmd: lbp rt -s python_path = sys.executable env = os.environ.copy() self.ctrl = stdio_client_controller.StdioClientController( command=python_path, args=['-m', 'langbot_plugin.cli.__init__', 'rt', '-s'], env=env, ) task = self.ctrl.run(new_connection_callback) if self.heartbeat_task is None: self.heartbeat_task = asyncio.create_task(self.heartbeat_loop()) asyncio.create_task(task) async def initialize_plugins(self): pass async def ping_plugin_runtime(self): if not hasattr(self, 'handler'): raise PluginRuntimeNotConnectedError('Plugin runtime is not connected') return await self.handler.ping() def _inspect_plugin_package( self, file_bytes: bytes, task_context: taskmgr.TaskContext | None, ) -> tuple[str | None, str | None]: """Extract plugin identity and dependency metadata from a plugin package.""" plugin_author = None plugin_name = None try: with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: try: manifest = yaml.safe_load(zf.read('manifest.yaml').decode('utf-8', errors='ignore')) or {} metadata = manifest.get('metadata', {}) plugin_author = metadata.get('author') plugin_name = metadata.get('name') except Exception: pass if task_context is not None: for name in zf.namelist(): if name.endswith('requirements.txt'): content = zf.read(name).decode('utf-8', errors='ignore') deps = [ line.strip() for line in content.splitlines() if line.strip() and not line.strip().startswith('#') ] task_context.metadata['deps_total'] = len(deps) task_context.metadata['deps_list'] = deps break except Exception: pass return plugin_author, plugin_name def _build_plugin_startup_failure_message( self, plugin_author: str, plugin_name: str, task_context: taskmgr.TaskContext | None, ) -> str: dep_hint = '' if task_context is not None: current_dep = task_context.metadata.get('current_dep') if current_dep: dep_hint = f' Last dependency: {current_dep}.' return ( f'Plugin {plugin_author}/{plugin_name} failed to start after installation. ' f'Dependency installation or plugin initialization may have failed.{dep_hint} ' f'Please check the plugin requirements and runtime logs.' ) async def _wait_for_installed_plugin_ready( self, plugin_author: str | None, plugin_name: str | None, task_context: taskmgr.TaskContext | None, timeout: float = 30, ): """Wait until the installed plugin is registered by the runtime. The plugin runtime launches plugins asynchronously. If dependency installation fails, the plugin process exits before registration; without this check the install task can incorrectly finish successfully. """ if not plugin_author or not plugin_name: return deadline = time.time() + timeout last_error: Exception | None = None while time.time() < deadline: try: plugin = await self.get_plugin_info(plugin_author, plugin_name) if plugin is not None: status = plugin.get('status') if status == 'initialized': return except Exception as e: last_error = e await asyncio.sleep(0.5) message = self._build_plugin_startup_failure_message(plugin_author, plugin_name, task_context) if last_error is not None: message = f'{message} Last runtime error: {last_error}' raise RuntimeError(message) async def install_plugin( self, install_source: PluginInstallSource, install_info: dict[str, Any], task_context: taskmgr.TaskContext | None = None, ): plugin_author = install_info.get('plugin_author') plugin_name = install_info.get('plugin_name') if install_source == PluginInstallSource.LOCAL: # transfer file before install file_bytes = install_info['plugin_file'] plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context) if task_context is not None and plugin_author and plugin_name: task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}' file_key = await self.handler.send_file(file_bytes, 'lbpkg') install_info['plugin_file_key'] = file_key del install_info['plugin_file'] self.ap.logger.info(f'Transfered file {file_key} to plugin runtime') elif install_source == PluginInstallSource.GITHUB: # download and transfer file with streaming progress try: async with httpx.AsyncClient( trust_env=True, follow_redirects=True, timeout=60, ) as client: async with client.stream('GET', install_info['asset_url']) as response: response.raise_for_status() total = int(response.headers.get('content-length', 0)) downloaded = 0 chunks: list[bytes] = [] start_time = time.time() if task_context is not None: task_context.set_current_action('downloading plugin package') task_context.metadata['download_total'] = total task_context.metadata['download_current'] = 0 task_context.metadata['download_speed'] = 0 async for chunk in response.aiter_bytes(chunk_size=8192): chunks.append(chunk) downloaded += len(chunk) if task_context is not None: elapsed = time.time() - start_time task_context.metadata['download_current'] = downloaded task_context.metadata['download_total'] = total task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0 file_bytes = b''.join(chunks) plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context) if task_context is not None and plugin_author and plugin_name: task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}' file_key = await self.handler.send_file(file_bytes, 'lbpkg') install_info['plugin_file_key'] = file_key self.ap.logger.info(f'Transfered file {file_key} to plugin runtime') except Exception as e: self.ap.logger.error(f'Failed to download file from GitHub: {e}') raise Exception(f'Failed to download file from GitHub: {e}') async for ret in self.handler.install_plugin(install_source.value, install_info): current_action = ret.get('current_action', None) if current_action is not None: if task_context is not None: task_context.set_current_action(current_action) trace = ret.get('trace', None) if trace is not None: if task_context is not None: task_context.trace(trace) # Forward structured metadata from runtime metadata = ret.get('metadata', None) if metadata is not None and task_context is not None: task_context.metadata.update(metadata) await self._wait_for_installed_plugin_ready(plugin_author, plugin_name, task_context) async def upgrade_plugin( self, plugin_author: str, plugin_name: str, task_context: taskmgr.TaskContext | None = None, ) -> dict[str, Any]: async for ret in self.handler.upgrade_plugin(plugin_author, plugin_name): current_action = ret.get('current_action', None) if current_action is not None: if task_context is not None: task_context.set_current_action(current_action) trace = ret.get('trace', None) if trace is not None: if task_context is not None: task_context.trace(trace) async def delete_plugin( self, plugin_author: str, plugin_name: str, delete_data: bool = False, task_context: taskmgr.TaskContext | None = None, ) -> dict[str, Any]: async for ret in self.handler.delete_plugin(plugin_author, plugin_name): current_action = ret.get('current_action', None) if current_action is not None: if task_context is not None: task_context.set_current_action(current_action) trace = ret.get('trace', None) if trace is not None: if task_context is not None: task_context.trace(trace) # Clean up plugin settings and binary storage if requested if delete_data: if task_context is not None: task_context.trace('Cleaning up plugin configuration and storage...') await self.handler.cleanup_plugin_data(plugin_author, plugin_name) async def list_plugins(self, component_kinds: list[str] | None = None) -> list[dict[str, Any]]: """List plugins, optionally filtered by component kinds. Args: component_kinds: Optional list of component kinds to filter by. If provided, only plugins that contain at least one component of the specified kinds will be returned. E.g., ['Command', 'EventListener', 'Tool'] for pipeline-related plugins. """ if not self.is_enable_plugin: return [] plugins = await self.handler.list_plugins() # Filter plugins by component kinds if specified if component_kinds is not None: filtered_plugins = [] for plugin in plugins: components = plugin.get('components', []) has_matching_component = False for component in components: component_kind = component.get('manifest', {}).get('manifest', {}).get('kind', '') if component_kind in component_kinds: has_matching_component = True break if has_matching_component: filtered_plugins.append(plugin) plugins = filtered_plugins # Sort plugins: debug plugins first, then by installation time (newest first) # Get installation timestamps from database in a single query plugin_timestamps = {} if plugins: # Build list of (author, name) tuples for all plugins plugin_ids = [] for plugin in plugins: author = plugin.get('manifest', {}).get('manifest', {}).get('metadata', {}).get('author', '') name = plugin.get('manifest', {}).get('manifest', {}).get('metadata', {}).get('name', '') if author and name: plugin_ids.append((author, name)) # Fetch all timestamps in a single query using OR conditions if plugin_ids: conditions = [ sqlalchemy.and_( persistence_plugin.PluginSetting.plugin_author == author, persistence_plugin.PluginSetting.plugin_name == name, ) for author, name in plugin_ids ] result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select( persistence_plugin.PluginSetting.plugin_author, persistence_plugin.PluginSetting.plugin_name, persistence_plugin.PluginSetting.created_at, ).where(sqlalchemy.or_(*conditions)) ) for row in result: plugin_id = f'{row.plugin_author}/{row.plugin_name}' plugin_timestamps[plugin_id] = row.created_at # Sort: debug plugins first (descending), then by created_at (descending) def sort_key(plugin): author = plugin.get('manifest', {}).get('manifest', {}).get('metadata', {}).get('author', '') name = plugin.get('manifest', {}).get('manifest', {}).get('metadata', {}).get('name', '') plugin_id = f'{author}/{name}' is_debug = plugin.get('debug', False) created_at = plugin_timestamps.get(plugin_id) # Return tuple: (not is_debug, -timestamp) # not is_debug: False (0) for debug plugins, True (1) for non-debug # -timestamp: to sort newest first (will be None for plugins without timestamp) timestamp_value = -created_at.timestamp() if created_at else 0 return (not is_debug, timestamp_value) plugins.sort(key=sort_key) return plugins async def get_plugin_info(self, author: str, plugin_name: str) -> dict[str, Any]: return await self.handler.get_plugin_info(author, plugin_name) async def set_plugin_config(self, plugin_author: str, plugin_name: str, config: dict[str, Any]) -> dict[str, Any]: return await self.handler.set_plugin_config(plugin_author, plugin_name, config) @alru_cache(ttl=5 * 60) # 5 minutes async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]: return await self.handler.get_plugin_icon(plugin_author, plugin_name) @alru_cache(ttl=5 * 60) # 5 minutes async def get_plugin_readme(self, plugin_author: str, plugin_name: str, language: str = 'en') -> str: return await self.handler.get_plugin_readme(plugin_author, plugin_name, language) @alru_cache(ttl=5 * 60) async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]: return await self.handler.get_plugin_assets(plugin_author, plugin_name, filepath) async def handle_page_api( self, plugin_author: str, plugin_name: str, page_id: str, endpoint: str, method: str, body: Any = None, ) -> dict[str, Any]: return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body) async def get_debug_info(self) -> dict[str, Any]: """Get debug information including debug key and WS URL""" if not self.is_enable_plugin: return {} return await self.handler.get_debug_info() async def emit_event( self, event: events.BaseEventModel, bound_plugins: list[str] | None = None, ) -> context.EventContext: event_ctx = context.EventContext.from_event(event) if not self.is_enable_plugin: return event_ctx # Pass include_plugins to runtime for filtering event_ctx_result = await self.handler.emit_event( event_ctx.model_dump(serialize_as_any=False), include_plugins=bound_plugins ) event_ctx = context.EventContext.model_validate(event_ctx_result['event_context']) return event_ctx async def list_tools(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]: if not self.is_enable_plugin: return [] # Pass include_plugins to runtime for filtering list_tools_data = await self.handler.list_tools(include_plugins=bound_plugins) tools = [ComponentManifest.model_validate(tool) for tool in list_tools_data] return tools async def call_tool( self, tool_name: str, parameters: dict[str, Any], session: provider_session.Session, query_id: int, bound_plugins: list[str] | None = None, ) -> dict[str, Any]: if not self.is_enable_plugin: return {'error': 'Tool not found: plugin system is disabled'} # Pass include_plugins to runtime for validation return await self.handler.call_tool( tool_name, parameters, session.model_dump(serialize_as_any=True), query_id, include_plugins=bound_plugins ) async def list_commands(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]: if not self.is_enable_plugin: return [] # Pass include_plugins to runtime for filtering list_commands_data = await self.handler.list_commands(include_plugins=bound_plugins) commands = [ComponentManifest.model_validate(command) for command in list_commands_data] return commands async def execute_command( self, command_ctx: command_context.ExecuteContext, bound_plugins: list[str] | None = None ) -> typing.AsyncGenerator[command_context.CommandReturn, None]: if not self.is_enable_plugin: yield command_context.CommandReturn(error=command_errors.CommandNotFoundError(command_ctx.command)) return # Pass include_plugins to runtime for validation gen = self.handler.execute_command(command_ctx.model_dump(serialize_as_any=True), include_plugins=bound_plugins) async for ret in gen: cmd_ret = command_context.CommandReturn.model_validate(ret) yield cmd_ret async def retrieve_knowledge( self, plugin_author: str, plugin_name: str, retriever_name: str, retrieval_context: dict[str, Any], ) -> dict[str, Any]: """Retrieve knowledge using a KnowledgeEngine instance.""" if not self.is_enable_plugin: return {'results': []} return await self.handler.retrieve_knowledge(plugin_author, plugin_name, retriever_name, retrieval_context) def dispose(self): # No need to consider the shutdown on Windows # for Windows can kill processes and subprocesses chainly if self.is_enable_plugin and isinstance(self.ctrl, stdio_client_controller.StdioClientController): self.ap.logger.info('Terminating plugin runtime process...') self.ctrl.process.terminate() if self.heartbeat_task is not None: self.heartbeat_task.cancel() self.heartbeat_task = None @staticmethod def _parse_plugin_id(plugin_id: str) -> tuple[str, str]: """Parse a plugin ID string into (author, name). Args: plugin_id: Plugin ID in 'author/name' format. Returns: Tuple of (plugin_author, plugin_name). Raises: ValueError: If plugin_id is not in the expected 'author/name' format. """ segments = plugin_id.split('/') if len(segments) != 2 or not all(segments): raise ValueError( f"Invalid plugin_id format: '{plugin_id}'. Expected 'author/name' format (e.g. 'langbot/rag-engine')." ) return segments[0], segments[1] async def call_rag_ingest(self, plugin_id: str, context_data: dict[str, Any]) -> dict[str, Any]: """Call plugin to ingest document. Args: plugin_id: Target plugin ID (author/name). context_data: IngestionContext data. """ plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.rag_ingest_document(plugin_author, plugin_name, context_data) async def call_rag_delete_document(self, plugin_id: str, document_id: str, kb_id: str) -> bool: plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.rag_delete_document(plugin_author, plugin_name, document_id, kb_id) async def get_rag_creation_schema(self, plugin_id: str) -> dict[str, Any]: plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.get_rag_creation_schema(plugin_author, plugin_name) async def get_rag_retrieval_schema(self, plugin_id: str) -> dict[str, Any]: plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.get_rag_retrieval_schema(plugin_author, plugin_name) async def rag_on_kb_create(self, plugin_id: str, kb_id: str, config: dict[str, Any]) -> dict[str, Any]: """Notify plugin about KB creation.""" plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.rag_on_kb_create(plugin_author, plugin_name, kb_id, config) async def rag_on_kb_delete(self, plugin_id: str, kb_id: str) -> dict[str, Any]: """Notify plugin about KB deletion.""" plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.rag_on_kb_delete(plugin_author, plugin_name, kb_id) async def call_rag_retrieve(self, plugin_id: str, retrieval_context: dict[str, Any]) -> dict[str, Any]: """Call plugin to retrieve knowledge. Args: plugin_id: Target plugin ID (author/name). retrieval_context: RetrievalContext data. """ plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.retrieve_knowledge(plugin_author, plugin_name, '', retrieval_context) async def list_knowledge_engines(self) -> list[dict[str, Any]]: """List all available Knowledge Engines from plugins. Returns a list of Knowledge Engines with their capabilities and configuration schemas. """ if not self.is_enable_plugin: return [] return await self.handler.list_knowledge_engines() async def list_parsers(self) -> list[dict[str, Any]]: """List all available parsers from plugins.""" if not self.is_enable_plugin: return [] return await self.handler.list_parsers() async def call_parser(self, plugin_id: str, context_data: dict[str, Any], file_bytes: bytes) -> dict[str, Any]: """Call plugin to parse a document.""" plugin_author, plugin_name = self._parse_plugin_id(plugin_id) return await self.handler.parse_document(plugin_author, plugin_name, context_data, file_bytes)