feat(rag): add data-only migration option and fix dialog width

Add option to migrate knowledge base data without auto-installing
the LangRAG plugin (for offline/intranet environments). Also
narrow the migration dialog to match other confirmation dialogs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
youhuanghe
2026-03-08 15:05:05 +00:00
parent 8da52b6dc7
commit 1b37dababa
7 changed files with 109 additions and 89 deletions

View File

@@ -2,6 +2,7 @@ import asyncio
import json
import httpx
import quart
import sqlalchemy
from ... import group
@@ -52,55 +53,56 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
)
return result.first() is not None
async def _execute_rag_migration(self, task_context: taskmgr.TaskContext):
"""Execute RAG migration: install langrag plugin and restore backup data."""
async def _execute_rag_migration(self, task_context: taskmgr.TaskContext, install_plugin: bool = True):
"""Execute RAG migration: optionally install langrag plugin and restore backup data."""
warnings = []
# Step 1: Install langrag plugin from marketplace
task_context.trace('Installing LangRAG plugin from marketplace...', action='install-plugin')
try:
# Query marketplace for latest version
space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/')
async with httpx.AsyncClient(trust_env=True, timeout=15) as client:
resp = await client.get(
f'{space_url}/api/v1/marketplace/plugins/{LANGRAG_PLUGIN_AUTHOR}/{LANGRAG_PLUGIN_NAME}'
)
resp.raise_for_status()
plugin_data = resp.json().get('data', {}).get('plugin', {})
plugin_version = plugin_data.get('latest_version')
if not plugin_version:
raise Exception('Could not determine latest LangRAG version from marketplace')
install_info = {
'plugin_author': LANGRAG_PLUGIN_AUTHOR,
'plugin_name': LANGRAG_PLUGIN_NAME,
'plugin_version': plugin_version,
}
await self.ap.plugin_connector.install_plugin(
PluginInstallSource.MARKETPLACE, install_info, task_context=task_context
)
except Exception as e:
# Plugin may already be installed
self.ap.logger.warning(f'LangRAG plugin install returned: {e}')
task_context.trace(f'Plugin install note: {e}')
# Step 2: Wait for the plugin to be available
task_context.trace('Waiting for LangRAG plugin to become available...', action='wait-plugin')
max_retries = 30
for i in range(max_retries):
if install_plugin:
# Step 1: Install langrag plugin from marketplace
task_context.trace('Installing LangRAG plugin from marketplace...', action='install-plugin')
try:
engines = await self.ap.plugin_connector.list_knowledge_engines()
engine_ids = [e.get('plugin_id') for e in engines]
if LANGRAG_PLUGIN_ID in engine_ids:
task_context.trace('LangRAG plugin is ready.')
break
except Exception:
pass
if i == max_retries - 1:
raise Exception(
f'LangRAG plugin ({LANGRAG_PLUGIN_ID}) did not become available after {max_retries} retries'
# Query marketplace for latest version
space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/')
async with httpx.AsyncClient(trust_env=True, timeout=15) as client:
resp = await client.get(
f'{space_url}/api/v1/marketplace/plugins/{LANGRAG_PLUGIN_AUTHOR}/{LANGRAG_PLUGIN_NAME}'
)
resp.raise_for_status()
plugin_data = resp.json().get('data', {}).get('plugin', {})
plugin_version = plugin_data.get('latest_version')
if not plugin_version:
raise Exception('Could not determine latest LangRAG version from marketplace')
install_info = {
'plugin_author': LANGRAG_PLUGIN_AUTHOR,
'plugin_name': LANGRAG_PLUGIN_NAME,
'plugin_version': plugin_version,
}
await self.ap.plugin_connector.install_plugin(
PluginInstallSource.MARKETPLACE, install_info, task_context=task_context
)
await asyncio.sleep(2)
except Exception as e:
# Plugin may already be installed
self.ap.logger.warning(f'LangRAG plugin install returned: {e}')
task_context.trace(f'Plugin install note: {e}')
# Step 2: Wait for the plugin to be available
task_context.trace('Waiting for LangRAG plugin to become available...', action='wait-plugin')
max_retries = 30
for i in range(max_retries):
try:
engines = await self.ap.plugin_connector.list_knowledge_engines()
engine_ids = [e.get('plugin_id') for e in engines]
if LANGRAG_PLUGIN_ID in engine_ids:
task_context.trace('LangRAG plugin is ready.')
break
except Exception:
pass
if i == max_retries - 1:
raise Exception(
f'LangRAG plugin ({LANGRAG_PLUGIN_ID}) did not become available after {max_retries} retries'
)
await asyncio.sleep(2)
# Step 3: Restore internal knowledge bases from backup
task_context.trace('Restoring internal knowledge bases...', action='restore-internal')
@@ -283,9 +285,12 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup):
if not needed:
return self.http_status(400, -1, 'RAG migration is not needed')
data = await quart.request.get_json(silent=True) or {}
install_plugin = data.get('install_plugin', True)
ctx = taskmgr.TaskContext.new()
wrapper = self.ap.task_mgr.create_user_task(
self._execute_rag_migration(task_context=ctx),
self._execute_rag_migration(task_context=ctx, install_plugin=install_plugin),
kind='rag-migration',
name='rag-migration-execute',
label='Migrating knowledge bases to plugin architecture',