mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-14 01:36:03 +00:00
feat: add external plugin auto download
This commit is contained in:
@@ -69,56 +69,99 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
)
|
)
|
||||||
return result.first() is not None
|
return result.first() is not None
|
||||||
|
|
||||||
|
async def _install_plugin_from_marketplace(
|
||||||
|
self, plugin_id: str, task_context: taskmgr.TaskContext, space_url: str
|
||||||
|
) -> None:
|
||||||
|
"""Install a single plugin from the marketplace."""
|
||||||
|
p_author, p_name = plugin_id.split('/', 1)
|
||||||
|
self.ap.logger.info(f'RAG migration: installing plugin {plugin_id} from marketplace...')
|
||||||
|
task_context.trace(f'Installing plugin {plugin_id} from marketplace...')
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(trust_env=True, timeout=15) as client:
|
||||||
|
resp = await client.get(f'{space_url}/api/v1/marketplace/plugins/{p_author}/{p_name}')
|
||||||
|
resp.raise_for_status()
|
||||||
|
p_data = resp.json().get('data', {}).get('plugin', {})
|
||||||
|
p_version = p_data.get('latest_version')
|
||||||
|
if not p_version:
|
||||||
|
raise Exception(f'Could not determine latest version for {plugin_id}')
|
||||||
|
|
||||||
|
await self.ap.plugin_connector.install_plugin(
|
||||||
|
PluginInstallSource.MARKETPLACE,
|
||||||
|
{
|
||||||
|
'plugin_author': p_author,
|
||||||
|
'plugin_name': p_name,
|
||||||
|
'plugin_version': p_version,
|
||||||
|
},
|
||||||
|
task_context=task_context,
|
||||||
|
)
|
||||||
|
self.ap.logger.info(f'RAG migration: plugin {plugin_id} install request sent.')
|
||||||
|
|
||||||
async def _execute_rag_migration(self, task_context: taskmgr.TaskContext, install_plugin: bool = True):
|
async def _execute_rag_migration(self, task_context: taskmgr.TaskContext, install_plugin: bool = True):
|
||||||
"""Execute RAG migration: optionally install langrag plugin and restore backup data."""
|
"""Execute RAG migration: install required plugins and restore backup data."""
|
||||||
warnings = []
|
warnings = []
|
||||||
|
|
||||||
|
# Collect all plugins we need: LangRAG (always) + connector plugins (from external KBs)
|
||||||
|
needed_plugins: dict[str, str] = {
|
||||||
|
LANGRAG_PLUGIN_ID: LANGRAG_PLUGIN_NAME,
|
||||||
|
}
|
||||||
|
|
||||||
|
has_external = await self._table_exists('external_knowledge_bases')
|
||||||
|
if has_external:
|
||||||
|
result = await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.text('SELECT DISTINCT plugin_author, plugin_name FROM external_knowledge_bases;')
|
||||||
|
)
|
||||||
|
for row in result.fetchall():
|
||||||
|
plugin_author = row[0] or ''
|
||||||
|
plugin_name = row[1] or ''
|
||||||
|
mapped_name = EXTERNAL_PLUGIN_NAME_MAPPING.get(plugin_name, plugin_name)
|
||||||
|
plugin_id = f'{plugin_author}/{mapped_name}'
|
||||||
|
if plugin_id not in needed_plugins:
|
||||||
|
needed_plugins[plugin_id] = mapped_name
|
||||||
|
|
||||||
|
self.ap.logger.info(f'RAG migration: plugins needed: {list(needed_plugins.keys())}')
|
||||||
|
|
||||||
if install_plugin:
|
if install_plugin:
|
||||||
# Step 1: Install langrag plugin from marketplace
|
# Step 1: Install all required plugins from marketplace
|
||||||
task_context.trace('Installing LangRAG plugin from marketplace...', action='install-plugin')
|
task_context.trace('Installing required plugins...', action='install-plugin')
|
||||||
try:
|
space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/')
|
||||||
# Query marketplace for latest version
|
|
||||||
space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/')
|
|
||||||
async with httpx.AsyncClient(trust_env=True, timeout=15) as client:
|
|
||||||
resp = await client.get(
|
|
||||||
f'{space_url}/api/v1/marketplace/plugins/{LANGRAG_PLUGIN_AUTHOR}/{LANGRAG_PLUGIN_NAME}'
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
plugin_data = resp.json().get('data', {}).get('plugin', {})
|
|
||||||
plugin_version = plugin_data.get('latest_version')
|
|
||||||
if not plugin_version:
|
|
||||||
raise Exception('Could not determine latest LangRAG version from marketplace')
|
|
||||||
|
|
||||||
install_info = {
|
for plugin_id in needed_plugins:
|
||||||
'plugin_author': LANGRAG_PLUGIN_AUTHOR,
|
try:
|
||||||
'plugin_name': LANGRAG_PLUGIN_NAME,
|
await self._install_plugin_from_marketplace(plugin_id, task_context, space_url)
|
||||||
'plugin_version': plugin_version,
|
except Exception as e:
|
||||||
}
|
self.ap.logger.warning(f'RAG migration: plugin {plugin_id} install returned: {e}')
|
||||||
await self.ap.plugin_connector.install_plugin(
|
task_context.trace(f'Plugin install note ({plugin_id}): {e}')
|
||||||
PluginInstallSource.MARKETPLACE, install_info, task_context=task_context
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
# Plugin may already be installed
|
|
||||||
self.ap.logger.warning(f'LangRAG plugin install returned: {e}')
|
|
||||||
task_context.trace(f'Plugin install note: {e}')
|
|
||||||
|
|
||||||
# Step 2: Wait for the plugin to be available
|
# Step 2: Wait for all plugins to become available as knowledge engines
|
||||||
task_context.trace('Waiting for LangRAG plugin to become available...', action='wait-plugin')
|
task_context.trace(
|
||||||
|
f'Waiting for plugins to become available: {list(needed_plugins.keys())}...',
|
||||||
|
action='wait-plugin',
|
||||||
|
)
|
||||||
max_retries = 30
|
max_retries = 30
|
||||||
|
engine_id_set: set[str] = set()
|
||||||
for i in range(max_retries):
|
for i in range(max_retries):
|
||||||
try:
|
try:
|
||||||
engines = await self.ap.plugin_connector.list_knowledge_engines()
|
engines = await self.ap.plugin_connector.list_knowledge_engines()
|
||||||
engine_ids = [e.get('plugin_id') for e in engines]
|
engine_id_set = {e.get('plugin_id') for e in engines}
|
||||||
if LANGRAG_PLUGIN_ID in engine_ids:
|
|
||||||
task_context.trace('LangRAG plugin is ready.')
|
|
||||||
break
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
if all(pid in engine_id_set for pid in needed_plugins):
|
||||||
|
self.ap.logger.info(f'RAG migration: all plugins ready: {engine_id_set}')
|
||||||
|
task_context.trace('All required plugins are ready.')
|
||||||
|
break
|
||||||
if i == max_retries - 1:
|
if i == max_retries - 1:
|
||||||
raise Exception(
|
still_missing = [pid for pid in needed_plugins if pid not in engine_id_set]
|
||||||
f'LangRAG plugin ({LANGRAG_PLUGIN_ID}) did not become available after {max_retries} retries'
|
warning = f'Plugin(s) {still_missing} did not become available after {max_retries} retries'
|
||||||
)
|
self.ap.logger.warning(f'RAG migration: {warning}')
|
||||||
|
warnings.append(warning)
|
||||||
|
task_context.trace(warning)
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
engines = await self.ap.plugin_connector.list_knowledge_engines()
|
||||||
|
engine_id_set = {e.get('plugin_id') for e in engines}
|
||||||
|
except Exception:
|
||||||
|
engine_id_set = set()
|
||||||
|
|
||||||
# Step 3: Restore internal knowledge bases from backup
|
# Step 3: Restore internal knowledge bases from backup
|
||||||
task_context.trace('Restoring internal knowledge bases...', action='restore-internal')
|
task_context.trace('Restoring internal knowledge bases...', action='restore-internal')
|
||||||
@@ -143,7 +186,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
creation_settings = json.dumps({'embedding_model_uuid': embedding_model_uuid})
|
creation_settings = json.dumps({'embedding_model_uuid': embedding_model_uuid})
|
||||||
retrieval_settings = json.dumps({'top_k': top_k})
|
retrieval_settings = json.dumps({'top_k': top_k})
|
||||||
|
|
||||||
# Insert into knowledge_bases with the same UUID
|
|
||||||
await self.ap.persistence_mgr.execute_async(
|
await self.ap.persistence_mgr.execute_async(
|
||||||
sqlalchemy.text(
|
sqlalchemy.text(
|
||||||
'INSERT INTO knowledge_bases '
|
'INSERT INTO knowledge_bases '
|
||||||
@@ -165,7 +207,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Notify langrag plugin to recognize this KB
|
|
||||||
try:
|
try:
|
||||||
config = {'embedding_model_uuid': embedding_model_uuid}
|
config = {'embedding_model_uuid': embedding_model_uuid}
|
||||||
await self.ap.plugin_connector.rag_on_kb_create(LANGRAG_PLUGIN_ID, kb_uuid, config)
|
await self.ap.plugin_connector.rag_on_kb_create(LANGRAG_PLUGIN_ID, kb_uuid, config)
|
||||||
@@ -175,26 +216,21 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
warnings.append(warning)
|
warnings.append(warning)
|
||||||
task_context.trace(warning)
|
task_context.trace(warning)
|
||||||
|
|
||||||
# Reload all knowledge bases into runtime
|
|
||||||
await self.ap.rag_mgr.load_knowledge_bases_from_db()
|
await self.ap.rag_mgr.load_knowledge_bases_from_db()
|
||||||
|
|
||||||
# Step 4: Restore external knowledge bases (read from preserved original table)
|
# Step 4: Restore external knowledge bases
|
||||||
task_context.trace('Restoring external knowledge bases...', action='restore-external')
|
task_context.trace('Restoring external knowledge bases...', action='restore-external')
|
||||||
if await self._table_exists('external_knowledge_bases'):
|
if has_external:
|
||||||
result = await self.ap.persistence_mgr.execute_async(
|
result = await self.ap.persistence_mgr.execute_async(
|
||||||
sqlalchemy.text('SELECT * FROM external_knowledge_bases;')
|
sqlalchemy.text('SELECT * FROM external_knowledge_bases;')
|
||||||
)
|
)
|
||||||
rows = result.fetchall()
|
rows = result.fetchall()
|
||||||
columns = result.keys()
|
columns = result.keys()
|
||||||
|
|
||||||
# Get current available engines for matching
|
self.ap.logger.info(
|
||||||
try:
|
f'RAG migration: {len(rows)} external KB(s) to restore. Available engines: {engine_id_set}'
|
||||||
engines = await self.ap.plugin_connector.list_knowledge_engines()
|
)
|
||||||
engine_id_set = {e.get('plugin_id') for e in engines}
|
task_context.trace(f'Found {len(rows)} external KB(s). Available engines: {engine_id_set}')
|
||||||
except Exception:
|
|
||||||
engine_id_set = set()
|
|
||||||
|
|
||||||
task_context.trace(f'Found {len(rows)} external KB(s) to restore. Available engines: {engine_id_set}')
|
|
||||||
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
row_dict = dict(zip(columns, row))
|
row_dict = dict(zip(columns, row))
|
||||||
@@ -207,40 +243,28 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
retriever_config = row_dict.get('retriever_config', {})
|
retriever_config = row_dict.get('retriever_config', {})
|
||||||
created_at = row_dict.get('created_at')
|
created_at = row_dict.get('created_at')
|
||||||
|
|
||||||
# Map old Retriever plugin name to new Connector plugin name
|
|
||||||
mapped_plugin_name = EXTERNAL_PLUGIN_NAME_MAPPING.get(plugin_name, plugin_name)
|
mapped_plugin_name = EXTERNAL_PLUGIN_NAME_MAPPING.get(plugin_name, plugin_name)
|
||||||
external_plugin_id = f'{plugin_author}/{mapped_plugin_name}'
|
external_plugin_id = f'{plugin_author}/{mapped_plugin_name}'
|
||||||
|
|
||||||
task_context.trace(
|
self.ap.logger.info(
|
||||||
f'Processing external KB "{name}" ({kb_uuid}): '
|
f'RAG migration: processing external KB "{name}" ({kb_uuid}), '
|
||||||
f'old_plugin={plugin_author}/{plugin_name}, mapped={external_plugin_id}'
|
f'plugin: {plugin_author}/{plugin_name} -> {external_plugin_id}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# Parse retriever_config
|
|
||||||
if isinstance(retriever_config, str):
|
if isinstance(retriever_config, str):
|
||||||
try:
|
try:
|
||||||
retriever_config = json.loads(retriever_config)
|
retriever_config = json.loads(retriever_config)
|
||||||
except (json.JSONDecodeError, TypeError):
|
except (json.JSONDecodeError, TypeError):
|
||||||
retriever_config = {}
|
retriever_config = {}
|
||||||
|
|
||||||
# Split retriever_config into creation_settings and retrieval_settings
|
|
||||||
creation_fields = EXTERNAL_PLUGIN_CREATION_FIELDS.get(external_plugin_id)
|
creation_fields = EXTERNAL_PLUGIN_CREATION_FIELDS.get(external_plugin_id)
|
||||||
if creation_fields is None:
|
if creation_fields is None:
|
||||||
# All fields go to creation_settings (e.g. FastGPT has no retrieval_schema)
|
|
||||||
creation_settings_dict = retriever_config
|
creation_settings_dict = retriever_config
|
||||||
retrieval_settings_dict = {}
|
retrieval_settings_dict = {}
|
||||||
else:
|
else:
|
||||||
creation_settings_dict = {k: v for k, v in retriever_config.items() if k in creation_fields}
|
creation_settings_dict = {k: v for k, v in retriever_config.items() if k in creation_fields}
|
||||||
retrieval_settings_dict = {k: v for k, v in retriever_config.items() if k not in creation_fields}
|
retrieval_settings_dict = {k: v for k, v in retriever_config.items() if k not in creation_fields}
|
||||||
|
|
||||||
task_context.trace(
|
|
||||||
f'Inserting external KB "{name}" into knowledge_bases table '
|
|
||||||
f'(plugin_id={external_plugin_id}, creation_settings={json.dumps(creation_settings_dict)}, '
|
|
||||||
f'retrieval_settings={json.dumps(retrieval_settings_dict)})'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Always insert the KB record so it shows in frontend,
|
|
||||||
# even if the connector plugin is not yet installed.
|
|
||||||
await self.ap.persistence_mgr.execute_async(
|
await self.ap.persistence_mgr.execute_async(
|
||||||
sqlalchemy.text(
|
sqlalchemy.text(
|
||||||
'INSERT INTO knowledge_bases '
|
'INSERT INTO knowledge_bases '
|
||||||
@@ -280,7 +304,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
|
|||||||
warnings.append(warning)
|
warnings.append(warning)
|
||||||
task_context.trace(warning)
|
task_context.trace(warning)
|
||||||
|
|
||||||
# Reload again after external KBs
|
|
||||||
await self.ap.rag_mgr.load_knowledge_bases_from_db()
|
await self.ap.rag_mgr.load_knowledge_bases_from_db()
|
||||||
|
|
||||||
# Step 5: Clear migration flag
|
# Step 5: Clear migration flag
|
||||||
|
|||||||
Reference in New Issue
Block a user