From c9fc64360f378d4bfc149f92e93368d98723f1bd Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 23 Mar 2026 21:06:23 +0800 Subject: [PATCH] feat(plugin): add unrestricted knowledge base query handlers Add handlers for LIST_KNOWLEDGE_BASES and RETRIEVE_KNOWLEDGE actions that allow plugins to list and retrieve from any knowledge base without pipeline scope restrictions, complementing the existing pipeline-scoped handlers. --- src/langbot/pkg/plugin/handler.py | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 645b28de..5d84ac34 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -613,6 +613,47 @@ class RuntimeConnectionHandler(handler.Handler): # ================= Knowledge Base Query APIs ================= + @self.action(PluginToRuntimeAction.LIST_KNOWLEDGE_BASES) + async def list_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse: + """List all knowledge bases available in the LangBot instance (unrestricted).""" + knowledge_bases = [] + for kb_uuid, kb in self.ap.rag_mgr.knowledge_bases.items(): + knowledge_bases.append( + { + 'uuid': kb.get_uuid(), + 'name': kb.get_name(), + 'description': kb.knowledge_base_entity.description or '', + } + ) + return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases}) + + @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE) + async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse: + """Retrieve documents from any knowledge base (unrestricted).""" + kb_id = data['kb_id'] + query_text = data['query_text'] + top_k = data.get('top_k', 5) + filters = data.get('filters', {}) + + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id) + if not kb: + return handler.ActionResponse.error( + message=f'Knowledge base {kb_id} not found', + ) + + try: + entries = await kb.retrieve( + query_text, + settings={ + 'top_k': top_k, + 'filters': filters, + }, + ) + results = [entry.model_dump(mode='json') for entry in entries] + return handler.ActionResponse.success(data={'results': results}) + except Exception as e: + return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id) + @self.action(PluginToRuntimeAction.LIST_PIPELINE_KNOWLEDGE_BASES) async def list_pipeline_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse: """List knowledge bases configured for the current query's pipeline."""