fix(plugin): surface dependency install failures

This commit is contained in:
RockChinQ
2026-05-09 14:42:05 +08:00
parent 1fcdbd472f
commit 3f50a56623
4 changed files with 115 additions and 29 deletions

View File

@@ -11,6 +11,7 @@ import os
import sys
import httpx
import sqlalchemy
import yaml
from async_lru import alru_cache
from langbot_plugin.api.entities.builtin.pipeline.query import provider_session
@@ -195,40 +196,110 @@ class PluginRuntimeConnector:
return await self.handler.ping()
def _extract_deps_metadata(
def _inspect_plugin_package(
self,
file_bytes: bytes,
task_context: taskmgr.TaskContext | None,
):
"""Extract dependency count from requirements.txt inside plugin zip."""
if task_context is None:
return
) -> tuple[str | None, str | None]:
"""Extract plugin identity and dependency metadata from a plugin package."""
plugin_author = None
plugin_name = None
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
for name in zf.namelist():
if name.endswith('requirements.txt'):
content = zf.read(name).decode('utf-8', errors='ignore')
deps = [
line.strip()
for line in content.splitlines()
if line.strip() and not line.strip().startswith('#')
]
task_context.metadata['deps_total'] = len(deps)
task_context.metadata['deps_list'] = deps
break
try:
manifest = yaml.safe_load(zf.read('manifest.yaml').decode('utf-8', errors='ignore')) or {}
metadata = manifest.get('metadata', {})
plugin_author = metadata.get('author')
plugin_name = metadata.get('name')
except Exception:
pass
if task_context is not None:
for name in zf.namelist():
if name.endswith('requirements.txt'):
content = zf.read(name).decode('utf-8', errors='ignore')
deps = [
line.strip()
for line in content.splitlines()
if line.strip() and not line.strip().startswith('#')
]
task_context.metadata['deps_total'] = len(deps)
task_context.metadata['deps_list'] = deps
break
except Exception:
pass
return plugin_author, plugin_name
def _build_plugin_startup_failure_message(
self,
plugin_author: str,
plugin_name: str,
task_context: taskmgr.TaskContext | None,
) -> str:
dep_hint = ''
if task_context is not None:
current_dep = task_context.metadata.get('current_dep')
if current_dep:
dep_hint = f' Last dependency: {current_dep}.'
return (
f'Plugin {plugin_author}/{plugin_name} failed to start after installation. '
f'Dependency installation or plugin initialization may have failed.{dep_hint} '
f'Please check the plugin requirements and runtime logs.'
)
async def _wait_for_installed_plugin_ready(
self,
plugin_author: str | None,
plugin_name: str | None,
task_context: taskmgr.TaskContext | None,
timeout: float = 30,
):
"""Wait until the installed plugin is registered by the runtime.
The plugin runtime launches plugins asynchronously. If dependency installation
fails, the plugin process exits before registration; without this check the
install task can incorrectly finish successfully.
"""
if not plugin_author or not plugin_name:
return
deadline = time.time() + timeout
last_error: Exception | None = None
while time.time() < deadline:
try:
plugin = await self.get_plugin_info(plugin_author, plugin_name)
if plugin is not None:
status = plugin.get('status')
if status == 'initialized':
return
except Exception as e:
last_error = e
await asyncio.sleep(0.5)
message = self._build_plugin_startup_failure_message(plugin_author, plugin_name, task_context)
if last_error is not None:
message = f'{message} Last runtime error: {last_error}'
raise RuntimeError(message)
async def install_plugin(
self,
install_source: PluginInstallSource,
install_info: dict[str, Any],
task_context: taskmgr.TaskContext | None = None,
):
plugin_author = install_info.get('plugin_author')
plugin_name = install_info.get('plugin_name')
if install_source == PluginInstallSource.LOCAL:
# transfer file before install
file_bytes = install_info['plugin_file']
self._extract_deps_metadata(file_bytes, task_context)
plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context)
if task_context is not None and plugin_author and plugin_name:
task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
del install_info['plugin_file']
@@ -265,7 +336,9 @@ class PluginRuntimeConnector:
task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0
file_bytes = b''.join(chunks)
self._extract_deps_metadata(file_bytes, task_context)
plugin_author, plugin_name = self._inspect_plugin_package(file_bytes, task_context)
if task_context is not None and plugin_author and plugin_name:
task_context.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
@@ -289,6 +362,8 @@ class PluginRuntimeConnector:
if metadata is not None and task_context is not None:
task_context.metadata.update(metadata)
await self._wait_for_installed_plugin_ready(plugin_author, plugin_name, task_context)
async def upgrade_plugin(
self,
plugin_author: str,

View File

@@ -76,10 +76,12 @@ function MarketplaceContent() {
// Register task completion callback for toast and plugin list refresh
useEffect(() => {
const onComplete = (_taskId: number, success: boolean) => {
const onComplete = (_taskId: number, success: boolean, error?: string) => {
if (success) {
toast.success(t('plugins.installSuccess'));
refreshPlugins();
} else {
toast.error(error || t('plugins.installFailed'));
}
};
registerOnTaskComplete(onComplete);

View File

@@ -45,7 +45,11 @@ export interface PluginInstallTask {
currentAction: string; // raw backend action string
}
type OnTaskCompleteCallback = (taskId: number, success: boolean) => void;
type OnTaskCompleteCallback = (
taskId: number,
success: boolean,
error?: string,
) => void;
interface PluginInstallTaskContextValue {
tasks: PluginInstallTask[];
@@ -224,13 +228,16 @@ export function PluginInstallTaskProvider({
onTaskCompleteCallbacks.current.delete(cb);
}, []);
const notifyTaskComplete = useCallback((taskId: number, success: boolean) => {
if (notifiedTaskIds.current.has(taskId)) return;
notifiedTaskIds.current.add(taskId);
onTaskCompleteCallbacks.current.forEach((cb) => {
cb(taskId, success);
});
}, []);
const notifyTaskComplete = useCallback(
(taskId: number, success: boolean, error?: string) => {
if (notifiedTaskIds.current.has(taskId)) return;
notifiedTaskIds.current.add(taskId);
onTaskCompleteCallbacks.current.forEach((cb) => {
cb(taskId, success, error);
});
},
[],
);
const pollTask = useCallback(
(taskKey: string, taskId: number) => {
@@ -289,7 +296,7 @@ export function PluginInstallTaskProvider({
}
if (exception) {
notifyTaskComplete(taskId, false);
notifyTaskComplete(taskId, false, exception);
return {
...t,
stage: InstallStage.ERROR,

View File

@@ -167,11 +167,13 @@ function PluginListView() {
// Register task completion callback for toast and plugin list refresh
useEffect(() => {
const onComplete = (_taskId: number, success: boolean) => {
const onComplete = (_taskId: number, success: boolean, error?: string) => {
if (success) {
toast.success(t('plugins.installSuccess'));
pluginInstalledRef.current?.refreshPluginList();
refreshPlugins();
} else {
toast.error(error || t('plugins.installFailed'));
}
};
registerOnTaskComplete(onComplete);