diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 30464312..1a294d8c 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -565,6 +565,16 @@ class RuntimeConnectionHandler(handler.Handler): except Exception as e: return _make_rag_error_response(e, 'FileServiceError', storage_path=storage_path) + @self.action(PluginToRuntimeAction.LIST_PARSERS) + async def list_parsers(data: dict[str, Any]) -> handler.ActionResponse: + """Plugin requests host to list available parser plugins.""" + mime_type = data.get('mime_type') + try: + parsers = await self.ap.knowledge_service.list_parsers(mime_type) + return handler.ActionResponse.success(data={'parsers': parsers}) + except Exception as e: + return _make_rag_error_response(e, 'ParserDiscoveryError', mime_type=mime_type) + @self.action(PluginToRuntimeAction.INVOKE_PARSER) async def invoke_parser(data: dict[str, Any]) -> handler.ActionResponse: """Plugin requests host to invoke a parser plugin.""" @@ -589,6 +599,94 @@ class RuntimeConnectionHandler(handler.Handler): except Exception as e: return _make_rag_error_response(e, 'ParserError') + # ================= Knowledge Base Query APIs ================= + + @self.action(PluginToRuntimeAction.LIST_PIPELINE_KNOWLEDGE_BASES) + async def list_pipeline_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse: + """List knowledge bases configured for the current query's pipeline.""" + query_id = data['query_id'] + + if query_id not in self.ap.query_pool.cached_queries: + return handler.ActionResponse.error( + message=f'Query with query_id {query_id} not found', + ) + + query = self.ap.query_pool.cached_queries[query_id] + + kb_uuids = [] + if query.pipeline_config: + local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) + kb_uuids = local_agent_config.get('knowledge-bases', []) + # Backward compatibility + if not kb_uuids: + old_kb_uuid = local_agent_config.get('knowledge-base', '') + if old_kb_uuid and old_kb_uuid != '__none__': + kb_uuids = [old_kb_uuid] + + knowledge_bases = [] + for kb_uuid in kb_uuids: + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) + if kb: + knowledge_bases.append( + { + 'uuid': kb.get_uuid(), + 'name': kb.get_name(), + 'description': kb.knowledge_base_entity.description or '', + } + ) + + return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases}) + + @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE) + async def retrieve_knowledge_base(data: dict[str, Any]) -> handler.ActionResponse: + """Retrieve documents from a knowledge base within the pipeline's scope.""" + query_id = data['query_id'] + kb_id = data['kb_id'] + query_text = data['query_text'] + top_k = data.get('top_k', 5) + filters = data.get('filters', {}) + + if query_id not in self.ap.query_pool.cached_queries: + return handler.ActionResponse.error( + message=f'Query with query_id {query_id} not found', + ) + + query = self.ap.query_pool.cached_queries[query_id] + + # Validate kb_id is in pipeline's allowed list + allowed_kb_uuids = [] + if query.pipeline_config: + local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) + allowed_kb_uuids = local_agent_config.get('knowledge-bases', []) + if not allowed_kb_uuids: + old_kb_uuid = local_agent_config.get('knowledge-base', '') + if old_kb_uuid and old_kb_uuid != '__none__': + allowed_kb_uuids = [old_kb_uuid] + + if kb_id not in allowed_kb_uuids: + return handler.ActionResponse.error( + message=f'Knowledge base {kb_id} is not configured for this pipeline', + ) + + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id) + if not kb: + return handler.ActionResponse.error( + message=f'Knowledge base {kb_id} not found', + ) + + try: + entries = await kb.retrieve( + query_text, + settings={ + 'top_k': top_k, + 'filters': filters, + }, + ) + results = [entry.model_dump(mode='json') for entry in entries] + return handler.ActionResponse.success(data={'results': results}) + except Exception as e: + return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id) + @self.action(CommonAction.PING) async def ping(data: dict[str, Any]) -> handler.ActionResponse: """Ping""" diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 7d45a1f2..62a534a1 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -168,6 +168,7 @@ class LocalAgentRunner(runner.RequestRunner): result = await kb.retrieve( user_message_text, settings={ + 'bot_uuid': query.bot_uuid or '', 'sender_id': str(query.sender_id), 'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}', },