feat: external knowledge bases (#1783)

* Initial plan

* Add backend support for external knowledge bases

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add frontend support for external knowledge bases with tabs UI

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add i18n translations for all languages (Traditional Chinese and Japanese)

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Update knowledge base tab list styling to match plugins page

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* perf: margin-top for kb page

* refactor: switch RetrievalResultEntry to langbot_plugin pkg ones

* feat: knowledge retriever listing and creating

* stash

* refactor: unify sync mechanism for polymorphic components

* feat: use unified retireval result struct in retrieval test page

* chore: remove unused methods

* feat: retriever icon displaying

* feat: localagent retrieval with external kbs

* chore: bump version of langbot-plugin to 0.2.0b1

* fix: i18n

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
Copilot
2025-11-27 23:19:43 +08:00
committed by GitHub
parent 3c04eeaff9
commit a8481e43f0
33 changed files with 1924 additions and 161 deletions

View File

@@ -0,0 +1,61 @@
import quart
from ... import group
@group.group_class('external_knowledge_base', '/api/v1/knowledge/external-bases')
class ExternalKnowledgeBaseRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('/retrievers', methods=['GET'])
async def list_knowledge_retrievers() -> quart.Response:
"""List all available knowledge retrievers from plugins."""
retrievers = await self.ap.plugin_connector.list_knowledge_retrievers()
return self.success(data={'retrievers': retrievers})
@self.route('', methods=['POST', 'GET'])
async def handle_external_knowledge_bases() -> quart.Response:
if quart.request.method == 'GET':
external_kbs = await self.ap.external_kb_service.get_external_knowledge_bases()
return self.success(data={'bases': external_kbs})
elif quart.request.method == 'POST':
json_data = await quart.request.json
kb_uuid = await self.ap.external_kb_service.create_external_knowledge_base(json_data)
return self.success(data={'uuid': kb_uuid})
return self.http_status(405, -1, 'Method not allowed')
@self.route(
'/<kb_uuid>',
methods=['GET', 'DELETE', 'PUT'],
)
async def handle_specific_external_knowledge_base(kb_uuid: str) -> quart.Response:
if quart.request.method == 'GET':
external_kb = await self.ap.external_kb_service.get_external_knowledge_base(kb_uuid)
if external_kb is None:
return self.http_status(404, -1, 'external knowledge base not found')
return self.success(
data={
'base': external_kb,
}
)
elif quart.request.method == 'PUT':
json_data = await quart.request.json
await self.ap.external_kb_service.update_external_knowledge_base(kb_uuid, json_data)
return self.success({})
elif quart.request.method == 'DELETE':
await self.ap.external_kb_service.delete_external_knowledge_base(kb_uuid)
return self.success({})
@self.route(
'/<kb_uuid>/retrieve',
methods=['POST'],
)
async def retrieve_external_knowledge_base(kb_uuid: str) -> str:
json_data = await quart.request.json
query = json_data.get('query')
results = await self.ap.external_kb_service.retrieve_external_knowledge_base(kb_uuid, query)
return self.success(data={'results': results})

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
from ....core import app
import sqlalchemy
from langbot.pkg.entity.persistence import rag as persistence_rag
import uuid
class ExternalKBService:
"""External KB service"""
ap: app.Application
def __init__(self, ap: app.Application) -> None:
self.ap = ap
# External Knowledge Base methods
async def get_external_knowledge_bases(self) -> list[dict]:
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_rag.ExternalKnowledgeBase))
external_kbs = result.all()
return [
self.ap.persistence_mgr.serialize_model(persistence_rag.ExternalKnowledgeBase, external_kb)
for external_kb in external_kbs
]
async def get_external_knowledge_base(self, kb_uuid: str) -> dict | None:
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_rag.ExternalKnowledgeBase).where(
persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid
)
)
external_kb = result.first()
if external_kb is None:
return None
return self.ap.persistence_mgr.serialize_model(persistence_rag.ExternalKnowledgeBase, external_kb)
async def create_external_knowledge_base(self, kb_data: dict) -> str:
kb_data['uuid'] = str(uuid.uuid4())
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_rag.ExternalKnowledgeBase).values(kb_data)
)
kb = await self.get_external_knowledge_base(kb_data['uuid'])
await self.ap.rag_mgr.load_external_knowledge_base(kb)
return kb_data['uuid']
async def retrieve_external_knowledge_base(self, kb_uuid: str, query: str) -> list[dict]:
"""Retrieve external knowledge base"""
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
return [
result.model_dump() for result in await runtime_kb.retrieve(query, 5)
] # top_k is just a placeholder for external knowledge base
async def update_external_knowledge_base(self, kb_uuid: str, kb_data: dict) -> None:
if 'uuid' in kb_data:
del kb_data['uuid']
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_rag.ExternalKnowledgeBase)
.values(kb_data)
.where(persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid)
)
await self.ap.rag_mgr.remove_knowledge_base_from_runtime(kb_uuid)
kb = await self.get_external_knowledge_base(kb_uuid)
await self.ap.rag_mgr.load_external_knowledge_base(kb)
async def delete_external_knowledge_base(self, kb_uuid: str) -> None:
await self.ap.rag_mgr.delete_knowledge_base(kb_uuid)
await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(persistence_rag.ExternalKnowledgeBase).where(
persistence_rag.ExternalKnowledgeBase.uuid == kb_uuid
)
)

View File

@@ -71,6 +71,9 @@ class KnowledgeService:
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
# Only internal KBs support file storage
if runtime_kb.get_type() != 'internal':
raise Exception('Only internal knowledge bases support file storage')
return await runtime_kb.store_file(file_id)
async def retrieve_knowledge_base(self, kb_uuid: str, query: str) -> list[dict]:
@@ -78,9 +81,16 @@ class KnowledgeService:
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
return [
result.model_dump() for result in await runtime_kb.retrieve(query, runtime_kb.knowledge_base_entity.top_k)
]
# Get top_k based on KB type
if runtime_kb.get_type() == 'internal':
top_k = runtime_kb.knowledge_base_entity.top_k
elif runtime_kb.get_type() == 'external':
top_k = runtime_kb.external_kb_entity.top_k
else:
top_k = 5 # default fallback
return [result.model_dump() for result in await runtime_kb.retrieve(query, top_k)]
async def get_files_by_knowledge_base(self, kb_uuid: str) -> list[dict]:
"""获取知识库文件"""
@@ -95,6 +105,9 @@ class KnowledgeService:
runtime_kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if runtime_kb is None:
raise Exception('Knowledge base not found')
# Only internal KBs support file deletion
if runtime_kb.get_type() != 'internal':
raise Exception('Only internal knowledge bases support file deletion')
await runtime_kb.delete_file(file_id)
async def delete_knowledge_base(self, kb_uuid: str) -> None:

View File

@@ -26,6 +26,7 @@ from ..api.http.service import knowledge as knowledge_service
from ..api.http.service import mcp as mcp_service
from ..api.http.service import apikey as apikey_service
from ..api.http.service import webhook as webhook_service
from ..api.http.service import external_kb as external_kb_service
from ..discover import engine as discover_engine
from ..storage import mgr as storagemgr
from ..utils import logcache
@@ -123,6 +124,8 @@ class Application:
knowledge_service: knowledge_service.KnowledgeService = None
external_kb_service: external_kb_service.ExternalKBService = None
mcp_service: mcp_service.MCPService = None
apikey_service: apikey_service.ApiKeyService = None

View File

@@ -23,6 +23,7 @@ from ...api.http.service import knowledge as knowledge_service
from ...api.http.service import mcp as mcp_service
from ...api.http.service import apikey as apikey_service
from ...api.http.service import webhook as webhook_service
from ...api.http.service import external_kb as external_kb_service
from ...discover import engine as discover_engine
from ...storage import mgr as storagemgr
from ...utils import logcache
@@ -63,14 +64,6 @@ class BuildAppStage(stage.BootingStage):
ap.persistence_mgr = persistence_mgr_inst
await persistence_mgr_inst.initialize()
async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None:
await asyncio.sleep(3)
await plugin_connector_inst.initialize()
plugin_connector_inst = plugin_connector.PluginRuntimeConnector(ap, runtime_disconnect_callback)
await plugin_connector_inst.initialize()
ap.plugin_connector = plugin_connector_inst
cmd_mgr_inst = cmdmgr.CommandManager(ap)
await cmd_mgr_inst.initialize()
ap.cmd_mgr = cmd_mgr_inst
@@ -130,6 +123,9 @@ class BuildAppStage(stage.BootingStage):
knowledge_service_inst = knowledge_service.KnowledgeService(ap)
ap.knowledge_service = knowledge_service_inst
external_kb_service_inst = external_kb_service.ExternalKBService(ap)
ap.external_kb_service = external_kb_service_inst
mcp_service_inst = mcp_service.MCPService(ap)
ap.mcp_service = mcp_service_inst
@@ -139,5 +135,13 @@ class BuildAppStage(stage.BootingStage):
webhook_service_inst = webhook_service.WebhookService(ap)
ap.webhook_service = webhook_service_inst
async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None:
await asyncio.sleep(3)
await plugin_connector_inst.initialize()
plugin_connector_inst = plugin_connector.PluginRuntimeConnector(ap, runtime_disconnect_callback)
await plugin_connector_inst.initialize()
ap.plugin_connector = plugin_connector_inst
ctrl = controller.Controller(ap)
ap.ctrl = ctrl

View File

@@ -1,20 +1,6 @@
import sqlalchemy
from .base import Base
# Base = declarative_base()
# DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///./rag_knowledge.db')
# print("Using database URL:", DATABASE_URL)
# engine = create_engine(DATABASE_URL, connect_args={'check_same_thread': False})
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# def create_db_and_tables():
# """Creates all database tables defined in the Base."""
# Base.metadata.create_all(bind=engine)
# print('Database tables created or already exist.')
class KnowledgeBase(Base):
__tablename__ = 'knowledge_bases'
@@ -43,8 +29,13 @@ class Chunk(Base):
text = sqlalchemy.Column(sqlalchemy.Text)
# class Vector(Base):
# __tablename__ = 'knowledge_base_vectors'
# uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True, unique=True)
# chunk_id = sqlalchemy.Column(sqlalchemy.String, nullable=True)
# embedding = sqlalchemy.Column(sqlalchemy.LargeBinary)
class ExternalKnowledgeBase(Base):
__tablename__ = 'external_knowledge_bases'
uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True, unique=True)
name = sqlalchemy.Column(sqlalchemy.String, index=True)
description = sqlalchemy.Column(sqlalchemy.Text)
plugin_author = sqlalchemy.Column(sqlalchemy.String, nullable=False)
plugin_name = sqlalchemy.Column(sqlalchemy.String, nullable=False)
retriever_name = sqlalchemy.Column(sqlalchemy.String, nullable=False)
retriever_config = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default={})
created_at = sqlalchemy.Column(sqlalchemy.DateTime, default=sqlalchemy.func.now())

View File

@@ -1,13 +0,0 @@
from __future__ import annotations
import pydantic
from typing import Any
class RetrieveResultEntry(pydantic.BaseModel):
id: str
metadata: dict[str, Any]
distance: float

View File

@@ -7,6 +7,7 @@ import typing
import os
import sys
import httpx
import traceback
import sqlalchemy
from async_lru import alru_cache
from langbot_plugin.api.entities.builtin.pipeline.query import provider_session
@@ -101,6 +102,12 @@ class PluginRuntimeConnector:
self.handler_task = asyncio.create_task(self.handler.run())
_ = await self.handler.ping()
self.ap.logger.info('Connected to plugin runtime.')
# Sync polymorphic component instances after connection
try:
await self.sync_polymorphic_component_instances()
except Exception as e:
traceback.print_exc()
self.ap.logger.error(f'Failed to sync polymorphic component instances: {e}')
await self.handler_task
task: asyncio.Task | None = None
@@ -427,6 +434,31 @@ class PluginRuntimeConnector:
yield cmd_ret
# KnowledgeRetriever methods
async def list_knowledge_retrievers(self, bound_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List all available KnowledgeRetriever components."""
if not self.is_enable_plugin:
return []
retrievers_data = await self.handler.list_knowledge_retrievers(include_plugins=bound_plugins)
return retrievers_data
async def retrieve_knowledge(
self,
plugin_author: str,
plugin_name: str,
retriever_name: str,
instance_id: str,
retrieval_context: dict[str, Any],
) -> list[dict[str, Any]]:
"""Retrieve knowledge using a KnowledgeRetriever instance."""
if not self.is_enable_plugin:
return []
return await self.handler.retrieve_knowledge(
plugin_author, plugin_name, retriever_name, instance_id, retrieval_context
)
def dispose(self):
# No need to consider the shutdown on Windows
# for Windows can kill processes and subprocesses chainly
@@ -438,3 +470,42 @@ class PluginRuntimeConnector:
if self.heartbeat_task is not None:
self.heartbeat_task.cancel()
self.heartbeat_task = None
async def sync_polymorphic_component_instances(self) -> dict[str, Any]:
"""Sync polymorphic component instances with runtime.
This collects all external knowledge bases from database and sends to runtime
to ensure instance integrity across restarts.
"""
if not self.is_enable_plugin:
return {}
# ===== external knowledge bases =====
external_kbs = await self.ap.external_kb_service.get_external_knowledge_bases()
# Build required_instances list
required_instances = []
for kb in external_kbs:
required_instances.append(
{
'instance_id': kb['uuid'],
'plugin_author': kb['plugin_author'],
'plugin_name': kb['plugin_name'],
'component_kind': 'KnowledgeRetriever',
'component_name': kb['retriever_name'],
'config': kb['retriever_config'],
}
)
self.ap.logger.info(f'Syncing {len(required_instances)} polymorphic component instances to runtime')
# Send to runtime
sync_result = await self.handler.sync_polymorphic_component_instances(required_instances)
self.ap.logger.info(
f'Sync complete: {len(sync_result.get("success_instances", []))} succeeded, '
f'{len(sync_result.get("failed_instances", []))} failed'
)
return sync_result

View File

@@ -713,3 +713,48 @@ class RuntimeConnectionHandler(handler.Handler):
async for ret in gen:
yield ret
# KnowledgeRetriever methods
async def list_knowledge_retrievers(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List knowledge retrievers"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_KNOWLEDGE_RETRIEVERS,
{
'include_plugins': include_plugins,
},
timeout=10,
)
return result['retrievers']
async def retrieve_knowledge(
self,
plugin_author: str,
plugin_name: str,
retriever_name: str,
instance_id: str,
retrieval_context: dict[str, Any],
) -> list[dict[str, Any]]:
"""Retrieve knowledge"""
result = await self.call_action(
LangBotToRuntimeAction.RETRIEVE_KNOWLEDGE,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
'retriever_name': retriever_name,
'instance_id': instance_id,
'retrieval_context': retrieval_context,
},
timeout=30,
)
return result['retrieval_results']
async def sync_polymorphic_component_instances(self, required_instances: list[dict[str, Any]]) -> dict[str, Any]:
"""Sync polymorphic component instances with runtime"""
result = await self.call_action(
LangBotToRuntimeAction.SYNC_POLYMORPHIC_COMPONENT_INSTANCES,
{
'required_instances': required_instances,
},
timeout=30,
)
return result

View File

@@ -6,6 +6,7 @@ import typing
from .. import runner
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.rag.context as rag_context
rag_combined_prompt_template = """
@@ -63,7 +64,7 @@ class LocalAgentRunner(runner.RequestRunner):
if kb_uuids and user_message_text:
# only support text for now
all_results = []
all_results: list[rag_context.RetrievalResultEntry] = []
# Retrieve from each knowledge base
for kb_uuid in kb_uuids:
@@ -73,7 +74,15 @@ class LocalAgentRunner(runner.RequestRunner):
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
continue
result = await kb.retrieve(user_message_text, kb.knowledge_base_entity.top_k)
# Get top_k based on KB type
if kb.get_type() == 'internal':
top_k = kb.knowledge_base_entity.top_k
elif kb.get_type() == 'external':
top_k = 5 # external kb's top_k is managed by plugin config
else:
top_k = 5 # default fallback
result = await kb.retrieve(user_message_text, top_k)
if result:
all_results.extend(result)
@@ -81,9 +90,14 @@ class LocalAgentRunner(runner.RequestRunner):
final_user_message_text = ''
if all_results:
rag_context = '\n\n'.join(
f'[{i + 1}] {entry.metadata.get("text", "")}' for i, entry in enumerate(all_results)
)
texts = []
idx = 1
for entry in all_results:
for content in entry.content:
if content.type == 'text' and content.text is not None:
texts.append(f'[{idx}] {content.text}')
idx += 1
rag_context = '\n\n'.join(texts)
final_user_message_text = rag_combined_prompt_template.format(
rag_context=rag_context, user_message=user_message_text
)

View File

@@ -0,0 +1,55 @@
"""Base classes and interfaces for knowledge bases"""
from __future__ import annotations
import abc
from langbot.pkg.core import app
from langbot_plugin.api.entities.builtin.rag import context as rag_context
class KnowledgeBaseInterface(metaclass=abc.ABCMeta):
"""Abstract interface for all knowledge base types"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
@abc.abstractmethod
async def initialize(self):
"""Initialize the knowledge base"""
pass
@abc.abstractmethod
async def retrieve(self, query: str, top_k: int) -> list[rag_context.RetrievalResultEntry]:
"""Retrieve relevant documents from the knowledge base
Args:
query: The query string
top_k: Number of top results to return
Returns:
List of retrieve result entries
"""
pass
@abc.abstractmethod
def get_uuid(self) -> str:
"""Get the UUID of the knowledge base"""
pass
@abc.abstractmethod
def get_name(self) -> str:
"""Get the name of the knowledge base"""
pass
@abc.abstractmethod
def get_type(self) -> str:
"""Get the type of knowledge base (internal/external)"""
pass
@abc.abstractmethod
async def dispose(self):
"""Clean up resources"""
pass

View File

@@ -0,0 +1,85 @@
"""External knowledge base implementation"""
from __future__ import annotations
from langbot.pkg.core import app
from langbot.pkg.entity.persistence import rag as persistence_rag
from langbot_plugin.api.entities.builtin.rag import context as rag_context
from .base import KnowledgeBaseInterface
class ExternalKnowledgeBase(KnowledgeBaseInterface):
"""External knowledge base that queries via HTTP API or plugin retriever"""
external_kb_entity: persistence_rag.ExternalKnowledgeBase
# Plugin retriever instance ID
retriever_instance_id: str | None
def __init__(self, ap: app.Application, external_kb_entity: persistence_rag.ExternalKnowledgeBase):
super().__init__(ap)
self.external_kb_entity = external_kb_entity
self.retriever_instance_id = None
async def initialize(self):
"""Initialize the external knowledge base"""
# Use KB UUID as instance ID
# Instance creation is now handled by the unified sync mechanism
# when LangBot connects to runtime
self.retriever_instance_id = self.external_kb_entity.uuid
self.ap.logger.info(
f'Initialized external KB {self.external_kb_entity.uuid}, instance will be created by sync mechanism'
)
async def retrieve(self, query: str, top_k: int = 5) -> list[rag_context.RetrievalResultEntry]:
"""Retrieve documents from external knowledge base via plugin retriever"""
if not self.retriever_instance_id:
self.ap.logger.error(f'No retriever instance for KB {self.external_kb_entity.uuid}')
return []
try:
results = await self.ap.plugin_connector.retrieve_knowledge(
self.external_kb_entity.plugin_author,
self.external_kb_entity.plugin_name,
self.external_kb_entity.retriever_name,
self.retriever_instance_id,
{'query': query},
)
# Convert plugin results to RetrievalResultEntry
retrieval_entries = []
for result in results:
retrieval_entries.append(rag_context.RetrievalResultEntry(**result))
return retrieval_entries
except Exception as e:
self.ap.logger.error(f'Plugin retriever error: {e}')
import traceback
traceback.print_exc()
return []
def get_uuid(self) -> str:
"""Get the UUID of the external knowledge base"""
return self.external_kb_entity.uuid
def get_name(self) -> str:
"""Get the name of the external knowledge base"""
return self.external_kb_entity.name
def get_type(self) -> str:
"""Get the type of knowledge base"""
return 'external'
async def dispose(self):
"""Clean up resources"""
# Trigger sync to immediately delete the instance from plugin process
# This ensures instance is cleaned up without waiting for next LangBot restart
try:
await self.ap.plugin_connector.sync_polymorphic_component_instances()
self.ap.logger.info(
f'Disposed external KB {self.external_kb_entity.uuid}, triggered sync to delete instance'
)
except Exception as e:
self.ap.logger.error(f'Failed to sync after disposing KB: {e}')

View File

@@ -10,10 +10,12 @@ from langbot.pkg.rag.knowledge.services.retriever import Retriever
import sqlalchemy
from langbot.pkg.entity.persistence import rag as persistence_rag
from langbot.pkg.core import taskmgr
from langbot.pkg.entity.rag import retriever as retriever_entities
from langbot_plugin.api.entities.builtin.rag import context as rag_context
from .base import KnowledgeBaseInterface
from .external import ExternalKnowledgeBase
class RuntimeKnowledgeBase:
class RuntimeKnowledgeBase(KnowledgeBaseInterface):
ap: app.Application
knowledge_base_entity: persistence_rag.KnowledgeBase
@@ -27,7 +29,7 @@ class RuntimeKnowledgeBase:
retriever: Retriever
def __init__(self, ap: app.Application, knowledge_base_entity: persistence_rag.KnowledgeBase):
self.ap = ap
super().__init__(ap)
self.knowledge_base_entity = knowledge_base_entity
self.parser = parser.FileParser(ap=self.ap)
self.chunker = chunker.Chunker(ap=self.ap)
@@ -187,7 +189,7 @@ class RuntimeKnowledgeBase:
return stored_file_tasks[0] if stored_file_tasks else ''
async def retrieve(self, query: str, top_k: int) -> list[retriever_entities.RetrieveResultEntry]:
async def retrieve(self, query: str, top_k: int) -> list[rag_context.RetrievalResultEntry]:
embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(
self.knowledge_base_entity.embedding_model_uuid
)
@@ -206,6 +208,18 @@ class RuntimeKnowledgeBase:
sqlalchemy.delete(persistence_rag.File).where(persistence_rag.File.uuid == file_id)
)
def get_uuid(self) -> str:
"""Get the UUID of the knowledge base"""
return self.knowledge_base_entity.uuid
def get_name(self) -> str:
"""Get the name of the knowledge base"""
return self.knowledge_base_entity.name
def get_type(self) -> str:
"""Get the type of knowledge base"""
return 'internal'
async def dispose(self):
await self.ap.vector_db_mgr.vector_db.delete_collection(self.knowledge_base_entity.uuid)
@@ -213,7 +227,7 @@ class RuntimeKnowledgeBase:
class RAGManager:
ap: app.Application
knowledge_bases: list[RuntimeKnowledgeBase]
knowledge_bases: list[KnowledgeBaseInterface]
def __init__(self, ap: app.Application):
self.ap = ap
@@ -227,8 +241,8 @@ class RAGManager:
self.knowledge_bases = []
# Load internal knowledge bases
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_rag.KnowledgeBase))
knowledge_bases = result.all()
for knowledge_base in knowledge_bases:
@@ -239,6 +253,21 @@ class RAGManager:
f'Error loading knowledge base {knowledge_base.uuid}: {e}\n{traceback.format_exc()}'
)
# Load external knowledge bases
external_result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_rag.ExternalKnowledgeBase)
)
external_kbs = external_result.all()
for external_kb in external_kbs:
try:
# Don't trigger sync during batch loading - will sync once after LangBot connects to runtime
await self.load_external_knowledge_base(external_kb, trigger_sync=False)
except Exception as e:
self.ap.logger.error(
f'Error loading external knowledge base {external_kb.uuid}: {e}\n{traceback.format_exc()}'
)
async def load_knowledge_base(
self,
knowledge_base_entity: persistence_rag.KnowledgeBase | sqlalchemy.Row | dict,
@@ -256,21 +285,54 @@ class RAGManager:
return runtime_knowledge_base
async def get_knowledge_base_by_uuid(self, kb_uuid: str) -> RuntimeKnowledgeBase | None:
async def load_external_knowledge_base(
self,
external_kb_entity: persistence_rag.ExternalKnowledgeBase | sqlalchemy.Row | dict,
trigger_sync: bool = True,
) -> ExternalKnowledgeBase:
"""Load external knowledge base into runtime
Args:
external_kb_entity: External KB entity to load
trigger_sync: Whether to trigger sync after loading (default True for manual creation, False for batch loading)
"""
if isinstance(external_kb_entity, sqlalchemy.Row):
external_kb_entity = persistence_rag.ExternalKnowledgeBase(**external_kb_entity._mapping)
elif isinstance(external_kb_entity, dict):
external_kb_entity = persistence_rag.ExternalKnowledgeBase(**external_kb_entity)
external_kb = ExternalKnowledgeBase(ap=self.ap, external_kb_entity=external_kb_entity)
await external_kb.initialize()
self.knowledge_bases.append(external_kb)
# Trigger sync to create the instance immediately (for manual creation)
# Skip sync during batch loading from DB to avoid multiple sync calls
if trigger_sync:
try:
await self.ap.plugin_connector.sync_polymorphic_component_instances()
self.ap.logger.info(f'Triggered sync after loading external KB {external_kb_entity.uuid}')
except Exception as e:
self.ap.logger.error(f'Failed to sync after loading external KB: {e}')
return external_kb
async def get_knowledge_base_by_uuid(self, kb_uuid: str) -> KnowledgeBaseInterface | None:
for kb in self.knowledge_bases:
if kb.knowledge_base_entity.uuid == kb_uuid:
if kb.get_uuid() == kb_uuid:
return kb
return None
async def remove_knowledge_base_from_runtime(self, kb_uuid: str):
for kb in self.knowledge_bases:
if kb.knowledge_base_entity.uuid == kb_uuid:
if kb.get_uuid() == kb_uuid:
self.knowledge_bases.remove(kb)
return
async def delete_knowledge_base(self, kb_uuid: str):
for kb in self.knowledge_bases:
if kb.knowledge_base_entity.uuid == kb_uuid:
if kb.get_uuid() == kb_uuid:
await kb.dispose()
self.knowledge_bases.remove(kb)
return

View File

@@ -3,7 +3,8 @@ from __future__ import annotations
from . import base_service
from ....core import app
from ....provider.modelmgr.requester import RuntimeEmbeddingModel
from ....entity.rag import retriever as retriever_entities
from langbot_plugin.api.entities.builtin.rag import context as rag_context
from langbot_plugin.api.entities.builtin.provider.message import ContentElement
class Retriever(base_service.BaseService):
@@ -13,7 +14,7 @@ class Retriever(base_service.BaseService):
async def retrieve(
self, kb_id: str, query: str, embedding_model: RuntimeEmbeddingModel, k: int = 5
) -> list[retriever_entities.RetrieveResultEntry]:
) -> list[rag_context.RetrievalResultEntry]:
self.ap.logger.info(
f"Retrieving for query: '{query[:10]}' with k={k} using {embedding_model.model_entity.uuid}"
)
@@ -35,11 +36,12 @@ class Retriever(base_service.BaseService):
self.ap.logger.info('No relevant chunks found in vector database.')
return []
result: list[retriever_entities.RetrieveResultEntry] = []
result: list[rag_context.RetrievalResultEntry] = []
for i, id in enumerate(matched_vector_ids):
entry = retriever_entities.RetrieveResultEntry(
entry = rag_context.RetrievalResultEntry(
id=id,
content=[ContentElement.from_text(vector_metadatas[i].get('text', ''))],
metadata=vector_metadatas[i],
distance=distances[i],
)