mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
Optimize the plugin system (#2090)
* Optimize the plugin system * feat: enhance plugin installation process and improve task management * fix: linter err --------- Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
@@ -265,6 +265,8 @@ class PluginsRouterGroup(group.RouterGroup):
|
||||
return self.http_status(400, -1, 'Missing asset_url parameter')
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.metadata['plugin_name'] = f'{owner}/{repo}'
|
||||
ctx.metadata['install_source'] = 'github'
|
||||
install_info = {
|
||||
'asset_url': asset_url,
|
||||
'owner': owner,
|
||||
@@ -295,12 +297,17 @@ class PluginsRouterGroup(group.RouterGroup):
|
||||
|
||||
data = await quart.request.json
|
||||
|
||||
plugin_author = data.get('plugin_author', '')
|
||||
plugin_name = data.get('plugin_name', '')
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
|
||||
ctx.metadata['install_source'] = 'marketplace'
|
||||
wrapper = self.ap.task_mgr.create_user_task(
|
||||
self.ap.plugin_connector.install_plugin(PluginInstallSource.MARKETPLACE, data, task_context=ctx),
|
||||
kind='plugin-operation',
|
||||
name='plugin-install-marketplace',
|
||||
label=f'Installing plugin from marketplace ...{data}',
|
||||
label=f'Installing plugin from marketplace {plugin_author}/{plugin_name}',
|
||||
context=ctx,
|
||||
)
|
||||
|
||||
@@ -323,11 +330,13 @@ class PluginsRouterGroup(group.RouterGroup):
|
||||
}
|
||||
|
||||
ctx = taskmgr.TaskContext.new()
|
||||
ctx.metadata['plugin_name'] = file.filename or 'local plugin'
|
||||
ctx.metadata['install_source'] = 'local'
|
||||
wrapper = self.ap.task_mgr.create_user_task(
|
||||
self.ap.plugin_connector.install_plugin(PluginInstallSource.LOCAL, data, task_context=ctx),
|
||||
kind='plugin-operation',
|
||||
name='plugin-install-local',
|
||||
label=f'Installing plugin from local ...{file.filename}',
|
||||
label=f'Installing plugin from local {file.filename}',
|
||||
context=ctx,
|
||||
)
|
||||
|
||||
|
||||
@@ -118,11 +118,14 @@ class SystemRouterGroup(group.RouterGroup):
|
||||
@self.route('/tasks', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def _() -> str:
|
||||
task_type = quart.request.args.get('type')
|
||||
task_kind = quart.request.args.get('kind')
|
||||
|
||||
if task_type == '':
|
||||
task_type = None
|
||||
if task_kind == '':
|
||||
task_kind = None
|
||||
|
||||
return self.success(data=self.ap.task_mgr.get_tasks_dict(task_type))
|
||||
return self.success(data=self.ap.task_mgr.get_tasks_dict(task_type, task_kind))
|
||||
|
||||
@self.route('/tasks/<task_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def _(task_id: str) -> str:
|
||||
|
||||
@@ -17,9 +17,13 @@ class TaskContext:
|
||||
log: str
|
||||
"""Log"""
|
||||
|
||||
metadata: dict
|
||||
"""Structured metadata for progress reporting"""
|
||||
|
||||
def __init__(self):
|
||||
self.current_action = 'default'
|
||||
self.log = ''
|
||||
self.metadata = {}
|
||||
|
||||
def _log(self, msg: str):
|
||||
self.log += msg + '\n'
|
||||
@@ -38,7 +42,7 @@ class TaskContext:
|
||||
self._log(f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | {self.current_action} | {msg}')
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {'current_action': self.current_action, 'log': self.log}
|
||||
return {'current_action': self.current_action, 'log': self.log, 'metadata': self.metadata}
|
||||
|
||||
@staticmethod
|
||||
def new() -> TaskContext:
|
||||
@@ -211,9 +215,14 @@ class AsyncTaskManager:
|
||||
def get_tasks_dict(
|
||||
self,
|
||||
type: str = None,
|
||||
kind: str = None,
|
||||
) -> dict:
|
||||
return {
|
||||
'tasks': [t.to_dict() for t in self.tasks if type is None or t.task_type == type],
|
||||
'tasks': [
|
||||
t.to_dict()
|
||||
for t in self.tasks
|
||||
if (type is None or t.task_type == type) and (kind is None or t.kind == kind)
|
||||
],
|
||||
'id_index': TaskWrapper._id_index,
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import time
|
||||
import zipfile
|
||||
from typing import Any
|
||||
import typing
|
||||
import os
|
||||
@@ -192,6 +195,30 @@ class PluginRuntimeConnector:
|
||||
|
||||
return await self.handler.ping()
|
||||
|
||||
def _extract_deps_metadata(
|
||||
self,
|
||||
file_bytes: bytes,
|
||||
task_context: taskmgr.TaskContext | None,
|
||||
):
|
||||
"""Extract dependency count from requirements.txt inside plugin zip."""
|
||||
if task_context is None:
|
||||
return
|
||||
try:
|
||||
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
|
||||
for name in zf.namelist():
|
||||
if name.endswith('requirements.txt'):
|
||||
content = zf.read(name).decode('utf-8', errors='ignore')
|
||||
deps = [
|
||||
line.strip()
|
||||
for line in content.splitlines()
|
||||
if line.strip() and not line.strip().startswith('#')
|
||||
]
|
||||
task_context.metadata['deps_total'] = len(deps)
|
||||
task_context.metadata['deps_list'] = deps
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def install_plugin(
|
||||
self,
|
||||
install_source: PluginInstallSource,
|
||||
@@ -201,23 +228,44 @@ class PluginRuntimeConnector:
|
||||
if install_source == PluginInstallSource.LOCAL:
|
||||
# transfer file before install
|
||||
file_bytes = install_info['plugin_file']
|
||||
self._extract_deps_metadata(file_bytes, task_context)
|
||||
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
|
||||
install_info['plugin_file_key'] = file_key
|
||||
del install_info['plugin_file']
|
||||
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
|
||||
elif install_source == PluginInstallSource.GITHUB:
|
||||
# download and transfer file
|
||||
# download and transfer file with streaming progress
|
||||
try:
|
||||
async with httpx.AsyncClient(
|
||||
trust_env=True,
|
||||
follow_redirects=True,
|
||||
timeout=20,
|
||||
timeout=60,
|
||||
) as client:
|
||||
response = await client.get(
|
||||
install_info['asset_url'],
|
||||
)
|
||||
response.raise_for_status()
|
||||
file_bytes = response.content
|
||||
async with client.stream('GET', install_info['asset_url']) as response:
|
||||
response.raise_for_status()
|
||||
total = int(response.headers.get('content-length', 0))
|
||||
downloaded = 0
|
||||
chunks: list[bytes] = []
|
||||
start_time = time.time()
|
||||
|
||||
if task_context is not None:
|
||||
task_context.set_current_action('downloading plugin package')
|
||||
task_context.metadata['download_total'] = total
|
||||
task_context.metadata['download_current'] = 0
|
||||
task_context.metadata['download_speed'] = 0
|
||||
|
||||
async for chunk in response.aiter_bytes(chunk_size=8192):
|
||||
chunks.append(chunk)
|
||||
downloaded += len(chunk)
|
||||
|
||||
if task_context is not None:
|
||||
elapsed = time.time() - start_time
|
||||
task_context.metadata['download_current'] = downloaded
|
||||
task_context.metadata['download_total'] = total
|
||||
task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0
|
||||
|
||||
file_bytes = b''.join(chunks)
|
||||
self._extract_deps_metadata(file_bytes, task_context)
|
||||
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
|
||||
install_info['plugin_file_key'] = file_key
|
||||
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
|
||||
@@ -236,6 +284,11 @@ class PluginRuntimeConnector:
|
||||
if task_context is not None:
|
||||
task_context.trace(trace)
|
||||
|
||||
# Forward structured metadata from runtime
|
||||
metadata = ret.get('metadata', None)
|
||||
if metadata is not None and task_context is not None:
|
||||
task_context.metadata.update(metadata)
|
||||
|
||||
async def upgrade_plugin(
|
||||
self,
|
||||
plugin_author: str,
|
||||
|
||||
Reference in New Issue
Block a user