diff --git a/src/langbot/pkg/api/http/controller/main.py b/src/langbot/pkg/api/http/controller/main.py index 1541a25e..b2eaa9b9 100644 --- a/src/langbot/pkg/api/http/controller/main.py +++ b/src/langbot/pkg/api/http/controller/main.py @@ -92,7 +92,11 @@ class HTTPController: @self.quart_app.route('/') async def index(): - return await quart.send_from_directory(frontend_path, 'index.html', mimetype='text/html') + response = await quart.send_from_directory(frontend_path, 'index.html', mimetype='text/html') + response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' + response.headers['Pragma'] = 'no-cache' + response.headers['Expires'] = '0' + return response @self.quart_app.route('/') async def static_file(path: str): diff --git a/src/langbot/pkg/vector/mgr.py b/src/langbot/pkg/vector/mgr.py index 551eeebb..817b2d7a 100644 --- a/src/langbot/pkg/vector/mgr.py +++ b/src/langbot/pkg/vector/mgr.py @@ -4,6 +4,8 @@ from ..core import app from .vdb import VectorDatabase from .vdbs.chroma import ChromaVectorDatabase from .vdbs.qdrant import QdrantVectorDatabase +from .vdbs.milvus import MilvusVectorDatabase +from .vdbs.pgvector_db import PgVectorDatabase class VectorDBManager: @@ -16,12 +18,47 @@ class VectorDBManager: async def initialize(self): kb_config = self.ap.instance_config.data.get('vdb') if kb_config: - if kb_config.get('use') == 'chroma': + vdb_type = kb_config.get('use') + + if vdb_type == 'chroma': self.vector_db = ChromaVectorDatabase(self.ap) self.ap.logger.info('Initialized Chroma vector database backend.') - elif kb_config.get('use') == 'qdrant': + + elif vdb_type == 'qdrant': self.vector_db = QdrantVectorDatabase(self.ap) self.ap.logger.info('Initialized Qdrant vector database backend.') + + elif vdb_type == 'milvus': + # Get Milvus configuration + milvus_config = kb_config.get('milvus', {}) + uri = milvus_config.get('uri', './data/milvus.db') + token = milvus_config.get('token') + self.vector_db = MilvusVectorDatabase(self.ap, uri=uri, token=token) + self.ap.logger.info('Initialized Milvus vector database backend.') + + elif vdb_type == 'pgvector': + # Get pgvector configuration + pgvector_config = kb_config.get('pgvector', {}) + connection_string = pgvector_config.get('connection_string') + if connection_string: + self.vector_db = PgVectorDatabase(self.ap, connection_string=connection_string) + else: + # Use individual parameters + host = pgvector_config.get('host', 'localhost') + port = pgvector_config.get('port', 5432) + database = pgvector_config.get('database', 'langbot') + user = pgvector_config.get('user', 'postgres') + password = pgvector_config.get('password', 'postgres') + self.vector_db = PgVectorDatabase( + self.ap, + host=host, + port=port, + database=database, + user=user, + password=password + ) + self.ap.logger.info('Initialized pgvector database backend.') + else: self.vector_db = ChromaVectorDatabase(self.ap) self.ap.logger.warning('No valid vector database backend configured, defaulting to Chroma.') diff --git a/src/langbot/pkg/vector/vdbs/milvus.py b/src/langbot/pkg/vector/vdbs/milvus.py new file mode 100644 index 00000000..d9f822cd --- /dev/null +++ b/src/langbot/pkg/vector/vdbs/milvus.py @@ -0,0 +1,249 @@ +from __future__ import annotations +import asyncio +from typing import Any, Dict +from pymilvus import MilvusClient, DataType +from langbot.pkg.vector.vdb import VectorDatabase +from langbot.pkg.core import app + + +class MilvusVectorDatabase(VectorDatabase): + """Milvus vector database implementation""" + + def __init__(self, ap: app.Application, uri: str = "milvus.db", token: str = None): + """Initialize Milvus vector database + + Args: + ap: Application instance + uri: Milvus connection URI. For local file: "milvus.db" + For remote server: "http://localhost:19530" + token: Optional authentication token for remote connections + """ + self.ap = ap + self.uri = uri + self.token = token + self.client = None + self._collections = {} + self._initialize_client() + + def _initialize_client(self): + """Initialize Milvus client connection""" + try: + if self.token: + self.client = MilvusClient(uri=self.uri, token=self.token) + else: + self.client = MilvusClient(uri=self.uri) + self.ap.logger.info(f"Connected to Milvus at {self.uri}") + except Exception as e: + self.ap.logger.error(f"Failed to connect to Milvus: {e}") + raise + + async def get_or_create_collection(self, collection: str): + """Get or create a Milvus collection + + Args: + collection: Collection name (corresponds to knowledge base UUID) + """ + if collection in self._collections: + return self._collections[collection] + + # Check if collection exists + has_collection = await asyncio.to_thread( + self.client.has_collection, collection_name=collection + ) + + if not has_collection: + # Create collection with custom schema to support string IDs + from pymilvus import CollectionSchema, FieldSchema, DataType + + fields = [ + FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=255), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="file_id", dtype=DataType.VARCHAR, max_length=255), + FieldSchema(name="chunk_uuid", dtype=DataType.VARCHAR, max_length=255), + ] + + schema = CollectionSchema(fields=fields, description="LangBot knowledge base vectors") + + await asyncio.to_thread( + self.client.create_collection, + collection_name=collection, + schema=schema, + metric_type="COSINE", + ) + + # Create index for vector field (required for loading/searching) + index_params = { + "metric_type": "COSINE", + "index_type": "AUTOINDEX", + "params": {} + } + await asyncio.to_thread( + self.client.create_index, + collection_name=collection, + field_name="vector", + index_params=index_params + ) + + self.ap.logger.info(f"Created Milvus collection '{collection}' with index") + else: + self.ap.logger.info(f"Milvus collection '{collection}' already exists") + + self._collections[collection] = collection + return collection + + async def add_embeddings( + self, + collection: str, + ids: list[str], + embeddings_list: list[list[float]], + metadatas: list[dict[str, Any]], + ) -> None: + """Add vector embeddings to Milvus collection + + Args: + collection: Collection name + ids: List of unique IDs for each vector + embeddings_list: List of embedding vectors + metadatas: List of metadata dictionaries for each vector + """ + await self.get_or_create_collection(collection) + + # Prepare data in Milvus format + data = [] + for i, vector_id in enumerate(ids): + entry = { + "id": vector_id, + "vector": embeddings_list[i], + } + # Add metadata fields + if metadatas and i < len(metadatas): + metadata = metadatas[i] + # Add common metadata fields + if "text" in metadata: + entry["text"] = metadata["text"] + if "file_id" in metadata: + entry["file_id"] = metadata["file_id"] + if "uuid" in metadata: + entry["chunk_uuid"] = metadata["uuid"] + data.append(entry) + + # Insert data into Milvus + await asyncio.to_thread( + self.client.insert, + collection_name=collection, + data=data + ) + + # Load collection for searching (Milvus requires this) + await asyncio.to_thread( + self.client.load_collection, + collection_name=collection + ) + + self.ap.logger.info(f"Added {len(ids)} embeddings to Milvus collection '{collection}'") + + async def search( + self, collection: str, query_embedding: list[float], k: int = 5 + ) -> Dict[str, Any]: + """Search for similar vectors in Milvus collection + + Args: + collection: Collection name + query_embedding: Query vector + k: Number of top results to return + + Returns: + Dictionary with search results in Chroma-compatible format + """ + await self.get_or_create_collection(collection) + + # Perform search + search_params = { + "metric_type": "COSINE", + "params": {} + } + + results = await asyncio.to_thread( + self.client.search, + collection_name=collection, + data=[query_embedding], + limit=k, + search_params=search_params, + output_fields=["text", "file_id", "chunk_uuid"] + ) + + # Convert results to Chroma-compatible format + # Milvus returns: [[ {id, distance, entity: {...}} ]] + ids = [] + distances = [] + metadatas = [] + + if results and len(results) > 0: + for hit in results[0]: + ids.append(hit.get("id", "")) + distances.append(hit.get("distance", 0.0)) + + # Build metadata from entity fields + entity = hit.get("entity", {}) + metadata = {} + if "text" in entity: + metadata["text"] = entity["text"] + if "file_id" in entity: + metadata["file_id"] = entity["file_id"] + if "chunk_uuid" in entity: + metadata["uuid"] = entity["chunk_uuid"] + metadatas.append(metadata) + + # Return in Chroma-compatible format (nested lists) + result = { + "ids": [ids], + "distances": [distances], + "metadatas": [metadatas] + } + + self.ap.logger.info( + f"Milvus search in '{collection}' returned {len(ids)} results" + ) + return result + + async def delete_by_file_id(self, collection: str, file_id: str) -> None: + """Delete vectors from collection by file_id + + Args: + collection: Collection name + file_id: File ID to filter deletion + """ + await self.get_or_create_collection(collection) + + # Delete entities matching the file_id + await asyncio.to_thread( + self.client.delete, + collection_name=collection, + filter=f'file_id == "{file_id}"' + ) + self.ap.logger.info( + f"Deleted embeddings from Milvus collection '{collection}' with file_id: {file_id}" + ) + + async def delete_collection(self, collection: str): + """Delete a Milvus collection + + Args: + collection: Collection name to delete + """ + if collection in self._collections: + del self._collections[collection] + + # Check if collection exists before attempting deletion + has_collection = await asyncio.to_thread( + self.client.has_collection, collection_name=collection + ) + + if has_collection: + await asyncio.to_thread( + self.client.drop_collection, collection_name=collection + ) + self.ap.logger.info(f"Deleted Milvus collection '{collection}'") + else: + self.ap.logger.warning(f"Milvus collection '{collection}' not found") diff --git a/src/langbot/pkg/vector/vdbs/pgvector_db.py b/src/langbot/pkg/vector/vdbs/pgvector_db.py new file mode 100644 index 00000000..2669902b --- /dev/null +++ b/src/langbot/pkg/vector/vdbs/pgvector_db.py @@ -0,0 +1,286 @@ +from __future__ import annotations +import asyncio +from typing import Any, Dict +from sqlalchemy import create_engine, text, Column, String, Text +from sqlalchemy.orm import declarative_base, sessionmaker, Session +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from pgvector.sqlalchemy import Vector +from langbot.pkg.vector.vdb import VectorDatabase +from langbot.pkg.core import app +import uuid + +Base = declarative_base() + + +class PgVectorEntry(Base): + """SQLAlchemy model for pgvector entries""" + __tablename__ = 'langbot_vectors' + + id = Column(String, primary_key=True) + collection = Column(String, index=True, nullable=False) + embedding = Column(Vector(1536)) # Default dimension, will be created dynamically + text = Column(Text) + file_id = Column(String, index=True) + chunk_uuid = Column(String) + + +class PgVectorDatabase(VectorDatabase): + """PostgreSQL with pgvector extension database implementation""" + + def __init__( + self, + ap: app.Application, + connection_string: str = None, + host: str = "localhost", + port: int = 5432, + database: str = "langbot", + user: str = "postgres", + password: str = "postgres" + ): + """Initialize pgvector database + + Args: + ap: Application instance + connection_string: Full PostgreSQL connection string (overrides other params) + host: PostgreSQL host + port: PostgreSQL port + database: Database name + user: Database user + password: Database password + """ + self.ap = ap + + # Build connection string if not provided + if connection_string: + self.connection_string = connection_string + else: + self.connection_string = ( + f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}" + ) + + self.async_connection_string = self.connection_string.replace( + "postgresql://", "postgresql+asyncpg://" + ).replace( + "postgresql+psycopg://", "postgresql+asyncpg://" + ) + + self.engine = None + self.async_engine = None + self.SessionLocal = None + self.AsyncSessionLocal = None + self._collections = set() + self._initialize_db() + + def _initialize_db(self): + """Initialize database connection and create tables""" + try: + # Create async engine for async operations + self.async_engine = create_async_engine( + self.async_connection_string, + echo=False, + pool_pre_ping=True + ) + self.AsyncSessionLocal = async_sessionmaker( + self.async_engine, + class_=AsyncSession, + expire_on_commit=False + ) + + # Create sync engine for table creation + sync_connection_string = self.connection_string.replace( + "postgresql+asyncpg://", "postgresql+psycopg://" + ) + self.engine = create_engine(sync_connection_string, echo=False) + + # Create pgvector extension and tables + with self.engine.connect() as conn: + # Enable pgvector extension + conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + conn.commit() + + # Create tables + Base.metadata.create_all(self.engine) + + self.ap.logger.info(f"Connected to PostgreSQL with pgvector") + except Exception as e: + self.ap.logger.error(f"Failed to connect to PostgreSQL: {e}") + raise + + async def get_or_create_collection(self, collection: str): + """Get or create a collection (logical grouping in pgvector) + + Args: + collection: Collection name (knowledge base UUID) + """ + # In pgvector, collections are logical - we just track them + if collection not in self._collections: + self._collections.add(collection) + self.ap.logger.info(f"Registered pgvector collection '{collection}'") + return collection + + async def add_embeddings( + self, + collection: str, + ids: list[str], + embeddings_list: list[list[float]], + metadatas: list[dict[str, Any]], + ) -> None: + """Add vector embeddings to pgvector + + Args: + collection: Collection name + ids: List of unique IDs for each vector + embeddings_list: List of embedding vectors + metadatas: List of metadata dictionaries + """ + await self.get_or_create_collection(collection) + + async with self.AsyncSessionLocal() as session: + try: + for i, vector_id in enumerate(ids): + metadata = metadatas[i] if i < len(metadatas) else {} + + entry = PgVectorEntry( + id=vector_id, + collection=collection, + embedding=embeddings_list[i], + text=metadata.get("text", ""), + file_id=metadata.get("file_id", ""), + chunk_uuid=metadata.get("uuid", "") + ) + session.add(entry) + + await session.commit() + self.ap.logger.info( + f"Added {len(ids)} embeddings to pgvector collection '{collection}'" + ) + except Exception as e: + await session.rollback() + self.ap.logger.error(f"Error adding embeddings to pgvector: {e}") + raise + + async def search( + self, collection: str, query_embedding: list[float], k: int = 5 + ) -> Dict[str, Any]: + """Search for similar vectors using cosine distance + + Args: + collection: Collection name + query_embedding: Query vector + k: Number of top results to return + + Returns: + Dictionary with search results in Chroma-compatible format + """ + await self.get_or_create_collection(collection) + + async with self.AsyncSessionLocal() as session: + try: + # Use cosine distance for similarity search + from sqlalchemy import select, func + + # Query for similar vectors + stmt = ( + select( + PgVectorEntry.id, + PgVectorEntry.text, + PgVectorEntry.file_id, + PgVectorEntry.chunk_uuid, + PgVectorEntry.embedding.cosine_distance(query_embedding).label('distance') + ) + .filter(PgVectorEntry.collection == collection) + .order_by(PgVectorEntry.embedding.cosine_distance(query_embedding)) + .limit(k) + ) + + result = await session.execute(stmt) + rows = result.fetchall() + + # Convert to Chroma-compatible format + ids = [] + distances = [] + metadatas = [] + + for row in rows: + ids.append(row.id) + distances.append(float(row.distance)) + metadatas.append({ + "text": row.text or "", + "file_id": row.file_id or "", + "uuid": row.chunk_uuid or "" + }) + + result_dict = { + "ids": [ids], + "distances": [distances], + "metadatas": [metadatas] + } + + self.ap.logger.info( + f"pgvector search in '{collection}' returned {len(ids)} results" + ) + return result_dict + + except Exception as e: + self.ap.logger.error(f"Error searching pgvector: {e}") + raise + + async def delete_by_file_id(self, collection: str, file_id: str) -> None: + """Delete vectors by file_id + + Args: + collection: Collection name + file_id: File ID to filter deletion + """ + await self.get_or_create_collection(collection) + + async with self.AsyncSessionLocal() as session: + try: + from sqlalchemy import delete + + stmt = delete(PgVectorEntry).where( + PgVectorEntry.collection == collection, + PgVectorEntry.file_id == file_id + ) + await session.execute(stmt) + await session.commit() + + self.ap.logger.info( + f"Deleted embeddings from pgvector collection '{collection}' with file_id: {file_id}" + ) + except Exception as e: + await session.rollback() + self.ap.logger.error(f"Error deleting from pgvector: {e}") + raise + + async def delete_collection(self, collection: str): + """Delete all vectors in a collection + + Args: + collection: Collection name to delete + """ + if collection in self._collections: + self._collections.remove(collection) + + async with self.AsyncSessionLocal() as session: + try: + from sqlalchemy import delete + + stmt = delete(PgVectorEntry).where( + PgVectorEntry.collection == collection + ) + await session.execute(stmt) + await session.commit() + + self.ap.logger.info(f"Deleted pgvector collection '{collection}'") + except Exception as e: + await session.rollback() + self.ap.logger.error(f"Error deleting pgvector collection: {e}") + raise + + async def close(self): + """Close database connections""" + if self.async_engine: + await self.async_engine.dispose() + if self.engine: + self.engine.dispose() diff --git a/src/langbot/templates/config.yaml b/src/langbot/templates/config.yaml index 449b245d..37a61bc8 100644 --- a/src/langbot/templates/config.yaml +++ b/src/langbot/templates/config.yaml @@ -36,6 +36,15 @@ vdb: host: localhost port: 6333 api_key: '' + milvus: + uri: 'http://127.0.0.1:19530' + token: '' + pgvector: + host: '127.0.0.1' + port: 5433 + database: 'langbot' + user: 'postgres' + password: 'postgres' storage: use: local s3: