From 3f50a56623164c1d92c4d2efa54aa7471043edfc Mon Sep 17 00:00:00 2001 From: RockChinQ Date: Sat, 9 May 2026 14:42:05 +0800 Subject: [PATCH] fix(plugin): surface dependency install failures --- src/langbot/pkg/plugin/connector.py | 111 +++++++++++++++--- web/src/app/home/market/page.tsx | 4 +- .../PluginInstallTaskContext.tsx | 25 ++-- web/src/app/home/plugins/page.tsx | 4 +- 4 files changed, 115 insertions(+), 29 deletions(-) diff --git a/src/langbot/pkg/plugin/connector.py b/src/langbot/pkg/plugin/connector.py index 90be428c..d47cb448 100644 --- a/src/langbot/pkg/plugin/connector.py +++ b/src/langbot/pkg/plugin/connector.py @@ -11,6 +11,7 @@ import os import sys import httpx import sqlalchemy +import yaml from async_lru import alru_cache from langbot_plugin.api.entities.builtin.pipeline.query import provider_session @@ -195,40 +196,110 @@ class PluginRuntimeConnector: return await self.handler.ping() - def _extract_deps_metadata( + def _inspect_plugin_package( self, file_bytes: bytes, task_context: taskmgr.TaskContext | None, - ): - """Extract dependency count from requirements.txt inside plugin zip.""" - if task_context is None: - return + ) -> tuple[str | None, str | None]: + """Extract plugin identity and dependency metadata from a plugin package.""" + plugin_author = None + plugin_name = None + try: with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: - for name in zf.namelist(): - if name.endswith('requirements.txt'): - content = zf.read(name).decode('utf-8', errors='ignore') - deps = [ - line.strip() - for line in content.splitlines() - if line.strip() and not line.strip().startswith('#') - ] - task_context.metadata['deps_total'] = len(deps) - task_context.metadata['deps_list'] = deps - break + try: + manifest = yaml.safe_load(zf.read('manifest.yaml').decode('utf-8', errors='ignore')) or {} + metadata = manifest.get('metadata', {}) + plugin_author = metadata.get('author') + plugin_name = metadata.get('name') + except Exception: + pass + + if task_context is not None: + for name in zf.namelist(): + if name.endswith('requirements.txt'): + content = zf.read(name).decode('utf-8', errors='ignore') + deps = [ + line.strip() + for line in content.splitlines() + if line.strip() and not line.strip().startswith('#') + ] + task_context.metadata['deps_total'] = len(deps) + task_context.metadata['deps_list'] = deps + break except Exception: pass + return plugin_author, plugin_name + + def _build_plugin_startup_failure_message( + self, + plugin_author: str, + plugin_name: str, + task_context: taskmgr.TaskContext | None, + ) -> str: + dep_hint = '' + if task_context is not None: + current_dep = task_context.metadata.get('current_dep') + if current_dep: + dep_hint = f' Last dependency: {current_dep}.' + + return ( + f'Plugin {plugin_author}/{plugin_name} failed to start after installation. ' + f'Dependency installation or plugin initialization may have failed.{dep_hint} ' + f'Please check the plugin requirements and runtime logs.' + ) + + async def _wait_for_installed_plugin_ready( + self, + plugin_author: str | None, + plugin_name: str | None, + task_context: taskmgr.TaskContext | None, + timeout: float = 30, + ): + """Wait until the installed plugin is registered by the runtime. + + The plugin runtime launches plugins asynchronously. If dependency installation + fails, the plugin process exits before registration; without this check the + install task can incorrectly finish successfully. + """ + if not plugin_author or not plugin_name: + return + + deadline = time.time() + timeout + last_error: Exception | None = None + while time.time() < deadline: + try: + plugin = await self.get_plugin_info(plugin_author, plugin_name) + if plugin is not None: + status = plugin.get('status') + if status == 'initialized': + return + except Exception as e: + last_error = e + + await asyncio.sleep(0.5) + + message = self._build_plugin_startup_failure_message(plugin_author, plugin_name, task_context) + if last_error is not None: + message = f'{message} Last runtime error: {last_error}' + raise RuntimeError(message) + async def install_plugin( self, install_source: PluginInstallSource, install_info: dict[str, Any], task_context: taskmgr.TaskContext | None = None, ): + plugin_author = install_info.get('plugin_author') + plugin_name = install_info.get('plugin_name') + if install_source == PluginInstallSource.LOCAL: # transfer file before install file_bytes = install_info['plugin_file'] - self._extract_deps_metadata(file_bytes, task_context) + plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context) + if task_context is not None and plugin_author and plugin_name: + task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}' file_key = await self.handler.send_file(file_bytes, 'lbpkg') install_info['plugin_file_key'] = file_key del install_info['plugin_file'] @@ -265,7 +336,9 @@ class PluginRuntimeConnector: task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0 file_bytes = b''.join(chunks) - self._extract_deps_metadata(file_bytes, task_context) + plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context) + if task_context is not None and plugin_author and plugin_name: + task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}' file_key = await self.handler.send_file(file_bytes, 'lbpkg') install_info['plugin_file_key'] = file_key self.ap.logger.info(f'Transfered file {file_key} to plugin runtime') @@ -289,6 +362,8 @@ class PluginRuntimeConnector: if metadata is not None and task_context is not None: task_context.metadata.update(metadata) + await self._wait_for_installed_plugin_ready(plugin_author, plugin_name, task_context) + async def upgrade_plugin( self, plugin_author: str, diff --git a/web/src/app/home/market/page.tsx b/web/src/app/home/market/page.tsx index e2158c53..b4130789 100644 --- a/web/src/app/home/market/page.tsx +++ b/web/src/app/home/market/page.tsx @@ -76,10 +76,12 @@ function MarketplaceContent() { // Register task completion callback for toast and plugin list refresh useEffect(() => { - const onComplete = (_taskId: number, success: boolean) => { + const onComplete = (_taskId: number, success: boolean, error?: string) => { if (success) { toast.success(t('plugins.installSuccess')); refreshPlugins(); + } else { + toast.error(error || t('plugins.installFailed')); } }; registerOnTaskComplete(onComplete); diff --git a/web/src/app/home/plugins/components/plugin-install-task/PluginInstallTaskContext.tsx b/web/src/app/home/plugins/components/plugin-install-task/PluginInstallTaskContext.tsx index a64d3f81..62ceae13 100644 --- a/web/src/app/home/plugins/components/plugin-install-task/PluginInstallTaskContext.tsx +++ b/web/src/app/home/plugins/components/plugin-install-task/PluginInstallTaskContext.tsx @@ -45,7 +45,11 @@ export interface PluginInstallTask { currentAction: string; // raw backend action string } -type OnTaskCompleteCallback = (taskId: number, success: boolean) => void; +type OnTaskCompleteCallback = ( + taskId: number, + success: boolean, + error?: string, +) => void; interface PluginInstallTaskContextValue { tasks: PluginInstallTask[]; @@ -224,13 +228,16 @@ export function PluginInstallTaskProvider({ onTaskCompleteCallbacks.current.delete(cb); }, []); - const notifyTaskComplete = useCallback((taskId: number, success: boolean) => { - if (notifiedTaskIds.current.has(taskId)) return; - notifiedTaskIds.current.add(taskId); - onTaskCompleteCallbacks.current.forEach((cb) => { - cb(taskId, success); - }); - }, []); + const notifyTaskComplete = useCallback( + (taskId: number, success: boolean, error?: string) => { + if (notifiedTaskIds.current.has(taskId)) return; + notifiedTaskIds.current.add(taskId); + onTaskCompleteCallbacks.current.forEach((cb) => { + cb(taskId, success, error); + }); + }, + [], + ); const pollTask = useCallback( (taskKey: string, taskId: number) => { @@ -289,7 +296,7 @@ export function PluginInstallTaskProvider({ } if (exception) { - notifyTaskComplete(taskId, false); + notifyTaskComplete(taskId, false, exception); return { ...t, stage: InstallStage.ERROR, diff --git a/web/src/app/home/plugins/page.tsx b/web/src/app/home/plugins/page.tsx index 61e44f43..0f175b82 100644 --- a/web/src/app/home/plugins/page.tsx +++ b/web/src/app/home/plugins/page.tsx @@ -167,11 +167,13 @@ function PluginListView() { // Register task completion callback for toast and plugin list refresh useEffect(() => { - const onComplete = (_taskId: number, success: boolean) => { + const onComplete = (_taskId: number, success: boolean, error?: string) => { if (success) { toast.success(t('plugins.installSuccess')); pluginInstalledRef.current?.refreshPluginList(); refreshPlugins(); + } else { + toast.error(error || t('plugins.installFailed')); } }; registerOnTaskComplete(onComplete);