diff --git a/src/langbot/pkg/api/http/controller/groups/knowledge/migration.py b/src/langbot/pkg/api/http/controller/groups/knowledge/migration.py index 86f6355b..2db835d8 100644 --- a/src/langbot/pkg/api/http/controller/groups/knowledge/migration.py +++ b/src/langbot/pkg/api/http/controller/groups/knowledge/migration.py @@ -69,56 +69,99 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): ) return result.first() is not None + async def _install_plugin_from_marketplace( + self, plugin_id: str, task_context: taskmgr.TaskContext, space_url: str + ) -> None: + """Install a single plugin from the marketplace.""" + p_author, p_name = plugin_id.split('/', 1) + self.ap.logger.info(f'RAG migration: installing plugin {plugin_id} from marketplace...') + task_context.trace(f'Installing plugin {plugin_id} from marketplace...') + + async with httpx.AsyncClient(trust_env=True, timeout=15) as client: + resp = await client.get(f'{space_url}/api/v1/marketplace/plugins/{p_author}/{p_name}') + resp.raise_for_status() + p_data = resp.json().get('data', {}).get('plugin', {}) + p_version = p_data.get('latest_version') + if not p_version: + raise Exception(f'Could not determine latest version for {plugin_id}') + + await self.ap.plugin_connector.install_plugin( + PluginInstallSource.MARKETPLACE, + { + 'plugin_author': p_author, + 'plugin_name': p_name, + 'plugin_version': p_version, + }, + task_context=task_context, + ) + self.ap.logger.info(f'RAG migration: plugin {plugin_id} install request sent.') + async def _execute_rag_migration(self, task_context: taskmgr.TaskContext, install_plugin: bool = True): - """Execute RAG migration: optionally install langrag plugin and restore backup data.""" + """Execute RAG migration: install required plugins and restore backup data.""" warnings = [] + # Collect all plugins we need: LangRAG (always) + connector plugins (from external KBs) + needed_plugins: dict[str, str] = { + LANGRAG_PLUGIN_ID: LANGRAG_PLUGIN_NAME, + } + + has_external = await self._table_exists('external_knowledge_bases') + if has_external: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('SELECT DISTINCT plugin_author, plugin_name FROM external_knowledge_bases;') + ) + for row in result.fetchall(): + plugin_author = row[0] or '' + plugin_name = row[1] or '' + mapped_name = EXTERNAL_PLUGIN_NAME_MAPPING.get(plugin_name, plugin_name) + plugin_id = f'{plugin_author}/{mapped_name}' + if plugin_id not in needed_plugins: + needed_plugins[plugin_id] = mapped_name + + self.ap.logger.info(f'RAG migration: plugins needed: {list(needed_plugins.keys())}') + if install_plugin: - # Step 1: Install langrag plugin from marketplace - task_context.trace('Installing LangRAG plugin from marketplace...', action='install-plugin') - try: - # Query marketplace for latest version - space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/') - async with httpx.AsyncClient(trust_env=True, timeout=15) as client: - resp = await client.get( - f'{space_url}/api/v1/marketplace/plugins/{LANGRAG_PLUGIN_AUTHOR}/{LANGRAG_PLUGIN_NAME}' - ) - resp.raise_for_status() - plugin_data = resp.json().get('data', {}).get('plugin', {}) - plugin_version = plugin_data.get('latest_version') - if not plugin_version: - raise Exception('Could not determine latest LangRAG version from marketplace') + # Step 1: Install all required plugins from marketplace + task_context.trace('Installing required plugins...', action='install-plugin') + space_url = self.ap.instance_config.data.get('space', {}).get('url', DEFAULT_SPACE_URL).rstrip('/') - install_info = { - 'plugin_author': LANGRAG_PLUGIN_AUTHOR, - 'plugin_name': LANGRAG_PLUGIN_NAME, - 'plugin_version': plugin_version, - } - await self.ap.plugin_connector.install_plugin( - PluginInstallSource.MARKETPLACE, install_info, task_context=task_context - ) - except Exception as e: - # Plugin may already be installed - self.ap.logger.warning(f'LangRAG plugin install returned: {e}') - task_context.trace(f'Plugin install note: {e}') + for plugin_id in needed_plugins: + try: + await self._install_plugin_from_marketplace(plugin_id, task_context, space_url) + except Exception as e: + self.ap.logger.warning(f'RAG migration: plugin {plugin_id} install returned: {e}') + task_context.trace(f'Plugin install note ({plugin_id}): {e}') - # Step 2: Wait for the plugin to be available - task_context.trace('Waiting for LangRAG plugin to become available...', action='wait-plugin') + # Step 2: Wait for all plugins to become available as knowledge engines + task_context.trace( + f'Waiting for plugins to become available: {list(needed_plugins.keys())}...', + action='wait-plugin', + ) max_retries = 30 + engine_id_set: set[str] = set() for i in range(max_retries): try: engines = await self.ap.plugin_connector.list_knowledge_engines() - engine_ids = [e.get('plugin_id') for e in engines] - if LANGRAG_PLUGIN_ID in engine_ids: - task_context.trace('LangRAG plugin is ready.') - break + engine_id_set = {e.get('plugin_id') for e in engines} except Exception: pass + if all(pid in engine_id_set for pid in needed_plugins): + self.ap.logger.info(f'RAG migration: all plugins ready: {engine_id_set}') + task_context.trace('All required plugins are ready.') + break if i == max_retries - 1: - raise Exception( - f'LangRAG plugin ({LANGRAG_PLUGIN_ID}) did not become available after {max_retries} retries' - ) + still_missing = [pid for pid in needed_plugins if pid not in engine_id_set] + warning = f'Plugin(s) {still_missing} did not become available after {max_retries} retries' + self.ap.logger.warning(f'RAG migration: {warning}') + warnings.append(warning) + task_context.trace(warning) await asyncio.sleep(2) + else: + try: + engines = await self.ap.plugin_connector.list_knowledge_engines() + engine_id_set = {e.get('plugin_id') for e in engines} + except Exception: + engine_id_set = set() # Step 3: Restore internal knowledge bases from backup task_context.trace('Restoring internal knowledge bases...', action='restore-internal') @@ -143,7 +186,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): creation_settings = json.dumps({'embedding_model_uuid': embedding_model_uuid}) retrieval_settings = json.dumps({'top_k': top_k}) - # Insert into knowledge_bases with the same UUID await self.ap.persistence_mgr.execute_async( sqlalchemy.text( 'INSERT INTO knowledge_bases ' @@ -165,7 +207,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): ) ) - # Notify langrag plugin to recognize this KB try: config = {'embedding_model_uuid': embedding_model_uuid} await self.ap.plugin_connector.rag_on_kb_create(LANGRAG_PLUGIN_ID, kb_uuid, config) @@ -175,26 +216,21 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): warnings.append(warning) task_context.trace(warning) - # Reload all knowledge bases into runtime await self.ap.rag_mgr.load_knowledge_bases_from_db() - # Step 4: Restore external knowledge bases (read from preserved original table) + # Step 4: Restore external knowledge bases task_context.trace('Restoring external knowledge bases...', action='restore-external') - if await self._table_exists('external_knowledge_bases'): + if has_external: result = await self.ap.persistence_mgr.execute_async( sqlalchemy.text('SELECT * FROM external_knowledge_bases;') ) rows = result.fetchall() columns = result.keys() - # Get current available engines for matching - try: - engines = await self.ap.plugin_connector.list_knowledge_engines() - engine_id_set = {e.get('plugin_id') for e in engines} - except Exception: - engine_id_set = set() - - task_context.trace(f'Found {len(rows)} external KB(s) to restore. Available engines: {engine_id_set}') + self.ap.logger.info( + f'RAG migration: {len(rows)} external KB(s) to restore. Available engines: {engine_id_set}' + ) + task_context.trace(f'Found {len(rows)} external KB(s). Available engines: {engine_id_set}') for row in rows: row_dict = dict(zip(columns, row)) @@ -207,40 +243,28 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): retriever_config = row_dict.get('retriever_config', {}) created_at = row_dict.get('created_at') - # Map old Retriever plugin name to new Connector plugin name mapped_plugin_name = EXTERNAL_PLUGIN_NAME_MAPPING.get(plugin_name, plugin_name) external_plugin_id = f'{plugin_author}/{mapped_plugin_name}' - task_context.trace( - f'Processing external KB "{name}" ({kb_uuid}): ' - f'old_plugin={plugin_author}/{plugin_name}, mapped={external_plugin_id}' + self.ap.logger.info( + f'RAG migration: processing external KB "{name}" ({kb_uuid}), ' + f'plugin: {plugin_author}/{plugin_name} -> {external_plugin_id}' ) - # Parse retriever_config if isinstance(retriever_config, str): try: retriever_config = json.loads(retriever_config) except (json.JSONDecodeError, TypeError): retriever_config = {} - # Split retriever_config into creation_settings and retrieval_settings creation_fields = EXTERNAL_PLUGIN_CREATION_FIELDS.get(external_plugin_id) if creation_fields is None: - # All fields go to creation_settings (e.g. FastGPT has no retrieval_schema) creation_settings_dict = retriever_config retrieval_settings_dict = {} else: creation_settings_dict = {k: v for k, v in retriever_config.items() if k in creation_fields} retrieval_settings_dict = {k: v for k, v in retriever_config.items() if k not in creation_fields} - task_context.trace( - f'Inserting external KB "{name}" into knowledge_bases table ' - f'(plugin_id={external_plugin_id}, creation_settings={json.dumps(creation_settings_dict)}, ' - f'retrieval_settings={json.dumps(retrieval_settings_dict)})' - ) - - # Always insert the KB record so it shows in frontend, - # even if the connector plugin is not yet installed. await self.ap.persistence_mgr.execute_async( sqlalchemy.text( 'INSERT INTO knowledge_bases ' @@ -280,7 +304,6 @@ class KnowledgeMigrationRouterGroup(group.RouterGroup): warnings.append(warning) task_context.trace(warning) - # Reload again after external KBs await self.ap.rag_mgr.load_knowledge_bases_from_db() # Step 5: Clear migration flag