mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-26 07:24:20 +00:00
feat(test): Phase 1.5 coverage expansion - COV-001 to COV-013
Coverage baseline raised from 13.65% to 26% (+12.35%) Gate raised from 12% to 18% Tasks completed: - COV-001: Command system unit tests (100% coverage) - COV-002: API service unit tests batch 1 (user/apikey/model/provider) - COV-003: Provider model manager unit tests - COV-004: Pipeline remaining stage tests (aggregator/cntfilter/longtext/msgtrun) - COV-005: Storage and utils coverage pass - COV-006: Gate ratchet 12%→15% - COV-007: Gate ratchet 15%→18% - COV-008: API service batch 2 (bot/pipeline/webhook/space/maintenance/mcp) - COV-009: Blocked - API controller circular import issue documented - COV-010: Plugin runtime unit tests (+0.08%) - COV-011: RAG and vector unit tests (+0.68%) - COV-012: Core boot and migration unit tests - COV-013: Provider requester logic unit tests (+0.62%) Key additions: - tests/utils/import_isolation.py: sys.modules isolation for circular imports - Provider requester mock tests: proved HTTP-dependent code can be tested locally - Vector filter utilities: 100% coverage on pure functions - API services: fake persistence pattern for unit testing Blocked issue COV-009 documented in langbot-test-plan/1.5/issues/ Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,210 @@
|
||||
"""Tests for vector filter utilities."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from langbot.pkg.vector.filter_utils import (
|
||||
SUPPORTED_OPS,
|
||||
normalize_filter,
|
||||
strip_unsupported_fields,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalizeFilter:
|
||||
"""Tests for normalize_filter function."""
|
||||
|
||||
def test_normalize_filter_empty_dict(self):
|
||||
"""Empty dict returns empty list."""
|
||||
result = normalize_filter({})
|
||||
assert result == []
|
||||
|
||||
def test_normalize_filter_none(self):
|
||||
"""None returns empty list."""
|
||||
result = normalize_filter(None)
|
||||
assert result == []
|
||||
|
||||
def test_normalize_filter_implicit_eq(self):
|
||||
"""Bare value becomes implicit $eq."""
|
||||
result = normalize_filter({'file_id': 'abc123'})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('file_id', '$eq', 'abc123')
|
||||
|
||||
def test_normalize_filter_explicit_eq(self):
|
||||
"""Explicit $eq operator."""
|
||||
result = normalize_filter({'file_id': {'$eq': 'abc123'}})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('file_id', '$eq', 'abc123')
|
||||
|
||||
def test_normalize_filter_comparison_operators(self):
|
||||
"""Test comparison operators: $gt, $gte, $lt, $lte."""
|
||||
result = normalize_filter({'created_at': {'$gte': 1700000000}})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('created_at', '$gte', 1700000000)
|
||||
|
||||
def test_normalize_filter_ne_operator(self):
|
||||
"""Test $ne operator."""
|
||||
result = normalize_filter({'status': {'$ne': 'deleted'}})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('status', '$ne', 'deleted')
|
||||
|
||||
def test_normalize_filter_in_operator(self):
|
||||
"""Test $in operator with list value."""
|
||||
result = normalize_filter({'file_type': {'$in': ['pdf', 'docx', 'txt']}})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('file_type', '$in', ['pdf', 'docx', 'txt'])
|
||||
|
||||
def test_normalize_filter_nin_operator(self):
|
||||
"""Test $nin operator."""
|
||||
result = normalize_filter({'status': {'$nin': ['deleted', 'archived']}})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('status', '$nin', ['deleted', 'archived'])
|
||||
|
||||
def test_normalize_filter_multiple_conditions(self):
|
||||
"""Multiple top-level keys are AND-ed (returned as multiple triples)."""
|
||||
result = normalize_filter({
|
||||
'file_id': 'abc',
|
||||
'status': {'$ne': 'deleted'},
|
||||
'created_at': {'$gte': 1700000000}
|
||||
})
|
||||
|
||||
assert len(result) == 3
|
||||
# Order should match dict iteration order
|
||||
field_ops = [(field, op) for field, op, _ in result]
|
||||
assert ('file_id', '$eq') in field_ops
|
||||
assert ('status', '$ne') in field_ops
|
||||
assert ('created_at', '$gte') in field_ops
|
||||
|
||||
def test_normalize_filter_unsupported_operator_raises(self):
|
||||
"""Unsupported operator raises ValueError."""
|
||||
with pytest.raises(ValueError, match='Unsupported filter operator'):
|
||||
normalize_filter({'field': {'$regex': 'pattern'}})
|
||||
|
||||
def test_normalize_filter_all_supported_ops(self):
|
||||
"""Test all supported operators are recognized."""
|
||||
for op in SUPPORTED_OPS:
|
||||
if op in ('$in', '$nin'):
|
||||
filter_dict = {'field': {op: ['value1', 'value2']}}
|
||||
else:
|
||||
filter_dict = {'field': {op: 'value'}}
|
||||
|
||||
result = normalize_filter(filter_dict)
|
||||
assert len(result) == 1
|
||||
assert result[0][1] == op
|
||||
|
||||
|
||||
class TestStripUnsupportedFields:
|
||||
"""Tests for strip_unsupported_fields function."""
|
||||
|
||||
def test_strip_keeps_supported_fields(self):
|
||||
"""Fields in supported_fields are kept."""
|
||||
triples = [
|
||||
('file_id', '$eq', 'abc'),
|
||||
('chunk_uuid', '$ne', 'def'),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(triples, {'file_id', 'chunk_uuid'})
|
||||
|
||||
assert len(result) == 2
|
||||
assert result == triples
|
||||
|
||||
def test_strip_removes_unsupported_fields(self):
|
||||
"""Fields not in supported_fields are removed."""
|
||||
triples = [
|
||||
('file_id', '$eq', 'abc'),
|
||||
('unknown_field', '$ne', 'def'),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(triples, {'file_id'})
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('file_id', '$eq', 'abc')
|
||||
|
||||
def test_strip_empty_triples(self):
|
||||
"""Empty triples list returns empty list."""
|
||||
result = strip_unsupported_fields([], {'file_id'})
|
||||
assert result == []
|
||||
|
||||
def test_strip_all_unsupported(self):
|
||||
"""All fields unsupported returns empty list."""
|
||||
triples = [
|
||||
('unknown1', '$eq', 'a'),
|
||||
('unknown2', '$eq', 'b'),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(triples, {'file_id'})
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_strip_with_field_aliases(self):
|
||||
"""Field aliases are resolved before checking support."""
|
||||
triples = [
|
||||
('uuid', '$eq', 'abc'), # alias for chunk_uuid
|
||||
('file_id', '$eq', 'def'),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(
|
||||
triples,
|
||||
{'file_id', 'chunk_uuid'},
|
||||
field_aliases={'uuid': 'chunk_uuid'}
|
||||
)
|
||||
|
||||
assert len(result) == 2
|
||||
# 'uuid' should be resolved to 'chunk_uuid'
|
||||
assert result[0] == ('chunk_uuid', '$eq', 'abc')
|
||||
assert result[1] == ('file_id', '$eq', 'def')
|
||||
|
||||
def test_strip_alias_not_in_supported(self):
|
||||
"""Alias resolved but still not in supported_fields is dropped."""
|
||||
triples = [
|
||||
('uuid', '$eq', 'abc'), # alias for chunk_uuid, but not supported
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(
|
||||
triples,
|
||||
{'file_id'}, # chunk_uuid not supported
|
||||
field_aliases={'uuid': 'chunk_uuid'}
|
||||
)
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_strip_preserves_operator_and_value(self):
|
||||
"""Strip only affects field name, not operator or value."""
|
||||
triples = [
|
||||
('file_id', '$in', ['a', 'b', 'c']),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(triples, {'file_id'})
|
||||
|
||||
assert result[0] == ('file_id', '$in', ['a', 'b', 'c'])
|
||||
|
||||
def test_strip_none_aliases(self):
|
||||
"""None field_aliases is treated as empty dict."""
|
||||
triples = [
|
||||
('file_id', '$eq', 'abc'),
|
||||
]
|
||||
|
||||
result = strip_unsupported_fields(triples, {'file_id'}, field_aliases=None)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0] == ('file_id', '$eq', 'abc')
|
||||
|
||||
|
||||
class TestSupportedOpsConstant:
|
||||
"""Tests for SUPPORTED_OPS constant."""
|
||||
|
||||
def test_supported_ops_contains_expected(self):
|
||||
"""SUPPORTED_OPS contains all expected operators."""
|
||||
expected = {'$eq', '$ne', '$gt', '$gte', '$lt', '$lte', '$in', '$nin'}
|
||||
assert SUPPORTED_OPS == expected
|
||||
|
||||
def test_supported_ops_is_frozenset(self):
|
||||
"""SUPPORTED_OPS is a frozenset for immutability."""
|
||||
from collections.abc import Set
|
||||
assert isinstance(SUPPORTED_OPS, Set)
|
||||
@@ -0,0 +1,338 @@
|
||||
"""Tests for VectorDBManager provider selection logic.
|
||||
|
||||
Tests the initialization logic that selects the appropriate VDB backend
|
||||
based on configuration, without actually creating real VDB instances.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from tests.utils.import_isolation import isolated_sys_modules
|
||||
|
||||
|
||||
class TestVectorDBManagerInitialization:
|
||||
"""Tests for VectorDBManager.initialize provider selection."""
|
||||
|
||||
def _create_mock_app(self, vdb_config: dict | None):
|
||||
"""Create mock app with vdb configuration."""
|
||||
mock_app = MagicMock()
|
||||
mock_app.instance_config = MagicMock()
|
||||
mock_app.instance_config.data = MagicMock()
|
||||
mock_app.instance_config.data.get = MagicMock(return_value=vdb_config)
|
||||
mock_app.logger = MagicMock()
|
||||
mock_app.logger.info = MagicMock()
|
||||
mock_app.logger.warning = MagicMock()
|
||||
return mock_app
|
||||
|
||||
def _make_vector_import_mocks(self):
|
||||
"""Create mocks for VDB backends to prevent real imports."""
|
||||
mocks = {}
|
||||
|
||||
# Mock core.app to break circular import
|
||||
mocks['langbot.pkg.core.app'] = MagicMock()
|
||||
|
||||
# Mock all VDB backend implementations
|
||||
for backend in ['chroma', 'qdrant', 'seekdb', 'milvus', 'pgvector_db']:
|
||||
mocks[f'langbot.pkg.vector.vdbs.{backend}'] = MagicMock()
|
||||
|
||||
return mocks
|
||||
|
||||
def test_initialize_no_config_defaults_to_chroma(self):
|
||||
"""No vdb config defaults to Chroma."""
|
||||
mock_app = self._create_mock_app(None)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
# Create mock Chroma class
|
||||
mock_chroma_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.chroma'].ChromaVectorDatabase = mock_chroma_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
# Import after mocking
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
# Run initialize synchronously for test
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
# Chroma should be instantiated
|
||||
mock_chroma_class.assert_called_once_with(mock_app)
|
||||
mock_app.logger.warning.assert_called()
|
||||
|
||||
def test_initialize_chroma_backend(self):
|
||||
"""Explicit chroma config uses Chroma backend."""
|
||||
vdb_config = {'use': 'chroma'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_chroma_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.chroma'].ChromaVectorDatabase = mock_chroma_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_chroma_class.assert_called_once_with(mock_app)
|
||||
mock_app.logger.info.assert_called()
|
||||
|
||||
def test_initialize_qdrant_backend(self):
|
||||
"""Qdrant config uses Qdrant backend."""
|
||||
vdb_config = {'use': 'qdrant'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_qdrant_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.qdrant'].QdrantVectorDatabase = mock_qdrant_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_qdrant_class.assert_called_once_with(mock_app)
|
||||
|
||||
def test_initialize_seekdb_backend(self):
|
||||
"""SeekDB config uses SeekDB backend."""
|
||||
vdb_config = {'use': 'seekdb'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_seekdb_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.seekdb'].SeekDBVectorDatabase = mock_seekdb_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_seekdb_class.assert_called_once_with(mock_app)
|
||||
|
||||
def test_initialize_milvus_backend_with_uri(self):
|
||||
"""Milvus config with custom URI."""
|
||||
vdb_config = {
|
||||
'use': 'milvus',
|
||||
'milvus': {
|
||||
'uri': 'http://localhost:19530',
|
||||
'token': 'root:Milvus',
|
||||
'db_name': 'langbot_db'
|
||||
}
|
||||
}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_milvus_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.milvus'].MilvusVectorDatabase = mock_milvus_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_milvus_class.assert_called_once_with(
|
||||
mock_app,
|
||||
uri='http://localhost:19530',
|
||||
token='root:Milvus',
|
||||
db_name='langbot_db'
|
||||
)
|
||||
|
||||
def test_initialize_milvus_backend_defaults(self):
|
||||
"""Milvus defaults when config not fully specified."""
|
||||
vdb_config = {'use': 'milvus'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_milvus_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.milvus'].MilvusVectorDatabase = mock_milvus_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
# Should use default values
|
||||
mock_milvus_class.assert_called_once_with(
|
||||
mock_app,
|
||||
uri='./data/milvus.db',
|
||||
token=None,
|
||||
db_name='default'
|
||||
)
|
||||
|
||||
def test_initialize_pgvector_with_connection_string(self):
|
||||
"""pgvector with connection string."""
|
||||
vdb_config = {
|
||||
'use': 'pgvector',
|
||||
'pgvector': {
|
||||
'connection_string': 'postgresql://user:pass@host:5432/langbot'
|
||||
}
|
||||
}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_pgvector_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.pgvector_db'].PgVectorDatabase = mock_pgvector_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_pgvector_class.assert_called_once_with(
|
||||
mock_app,
|
||||
connection_string='postgresql://user:pass@host:5432/langbot'
|
||||
)
|
||||
|
||||
def test_initialize_pgvector_with_individual_params(self):
|
||||
"""pgvector with individual connection parameters."""
|
||||
vdb_config = {
|
||||
'use': 'pgvector',
|
||||
'pgvector': {
|
||||
'host': 'db.example.com',
|
||||
'port': 5433,
|
||||
'database': 'vectordb',
|
||||
'user': 'admin',
|
||||
'password': 'secret'
|
||||
}
|
||||
}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_pgvector_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.pgvector_db'].PgVectorDatabase = mock_pgvector_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_pgvector_class.assert_called_once_with(
|
||||
mock_app,
|
||||
host='db.example.com',
|
||||
port=5433,
|
||||
database='vectordb',
|
||||
user='admin',
|
||||
password='secret'
|
||||
)
|
||||
|
||||
def test_initialize_pgvector_defaults(self):
|
||||
"""pgvector defaults when no config params."""
|
||||
vdb_config = {'use': 'pgvector'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_pgvector_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.pgvector_db'].PgVectorDatabase = mock_pgvector_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_pgvector_class.assert_called_once_with(
|
||||
mock_app,
|
||||
host='localhost',
|
||||
port=5432,
|
||||
database='langbot',
|
||||
user='postgres',
|
||||
password='postgres'
|
||||
)
|
||||
|
||||
def test_initialize_unknown_backend_defaults_to_chroma(self):
|
||||
"""Unknown vdb type defaults to Chroma with warning."""
|
||||
vdb_config = {'use': 'unknown_backend'}
|
||||
mock_app = self._create_mock_app(vdb_config)
|
||||
|
||||
mocks = self._make_vector_import_mocks()
|
||||
mock_chroma_class = MagicMock()
|
||||
mocks['langbot.pkg.vector.vdbs.chroma'].ChromaVectorDatabase = mock_chroma_class
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(mgr.initialize())
|
||||
|
||||
mock_chroma_class.assert_called_once_with(mock_app)
|
||||
mock_app.logger.warning.assert_called()
|
||||
# Should warn about no valid backend
|
||||
warning_msg = mock_app.logger.warning.call_args[0][0]
|
||||
assert 'No valid' in warning_msg or 'defaulting' in warning_msg
|
||||
|
||||
|
||||
class TestVectorDBManagerProxies:
|
||||
"""Tests for VectorDBManager proxy methods."""
|
||||
|
||||
def test_get_supported_search_types_no_vector_db(self):
|
||||
"""get_supported_search_types returns vector when no vector_db."""
|
||||
mock_app = MagicMock()
|
||||
mock_app.instance_config = MagicMock()
|
||||
mock_app.instance_config.data = MagicMock()
|
||||
mock_app.instance_config.data.get = MagicMock(return_value=None)
|
||||
mock_app.logger = MagicMock()
|
||||
|
||||
mocks = {'langbot.pkg.core.app': MagicMock()}
|
||||
for backend in ['chroma', 'qdrant', 'seekdb', 'milvus', 'pgvector_db']:
|
||||
mocks[f'langbot.pkg.vector.vdbs.{backend}'] = MagicMock()
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
mgr.vector_db = None # Explicitly None
|
||||
|
||||
result = mgr.get_supported_search_types()
|
||||
assert result == ['vector']
|
||||
|
||||
def test_get_supported_search_types_with_vector_db(self):
|
||||
"""get_supported_search_types delegates to vector_db."""
|
||||
mock_app = MagicMock()
|
||||
|
||||
# Create mock vector_db with supported_search_types
|
||||
mock_vector_db = MagicMock()
|
||||
mock_vector_db.supported_search_types = MagicMock(
|
||||
return_value=[
|
||||
MagicMock(value='vector'),
|
||||
MagicMock(value='full_text'),
|
||||
]
|
||||
)
|
||||
|
||||
mocks = {'langbot.pkg.core.app': MagicMock()}
|
||||
for backend in ['chroma', 'qdrant', 'seekdb', 'milvus', 'pgvector_db']:
|
||||
mocks[f'langbot.pkg.vector.vdbs.{backend}'] = MagicMock()
|
||||
|
||||
with isolated_sys_modules(mocks):
|
||||
from langbot.pkg.vector.mgr import VectorDBManager
|
||||
|
||||
mgr = VectorDBManager(mock_app)
|
||||
mgr.vector_db = mock_vector_db
|
||||
|
||||
result = mgr.get_supported_search_types()
|
||||
assert result == ['vector', 'full_text']
|
||||
@@ -0,0 +1,173 @@
|
||||
"""Tests for VectorDatabase base class and SearchType enum."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
import pytest
|
||||
|
||||
from langbot.pkg.vector.vdb import SearchType, VectorDatabase
|
||||
|
||||
|
||||
class TestSearchType:
|
||||
"""Tests for SearchType enum."""
|
||||
|
||||
def test_search_type_values(self):
|
||||
"""Test SearchType enum values."""
|
||||
assert SearchType.VECTOR.value == 'vector'
|
||||
assert SearchType.FULL_TEXT.value == 'full_text'
|
||||
assert SearchType.HYBRID.value == 'hybrid'
|
||||
|
||||
def test_search_type_is_string_enum(self):
|
||||
"""SearchType is a string enum."""
|
||||
assert isinstance(SearchType.VECTOR, str)
|
||||
assert SearchType.VECTOR == 'vector'
|
||||
|
||||
def test_search_type_from_string(self):
|
||||
"""Can create SearchType from string."""
|
||||
assert SearchType('vector') == SearchType.VECTOR
|
||||
assert SearchType('full_text') == SearchType.FULL_TEXT
|
||||
assert SearchType('hybrid') == SearchType.HYBRID
|
||||
|
||||
|
||||
class TestVectorDatabaseAbstractMethods:
|
||||
"""Tests for VectorDatabase abstract methods."""
|
||||
|
||||
def test_vector_database_is_abstract(self):
|
||||
"""VectorDatabase is abstract and cannot be instantiated directly."""
|
||||
with pytest.raises(TypeError):
|
||||
VectorDatabase()
|
||||
|
||||
def test_abstract_methods_required(self):
|
||||
"""Subclass must implement all abstract methods."""
|
||||
class IncompleteVectorDB(VectorDatabase):
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
IncompleteVectorDB()
|
||||
|
||||
def test_supported_search_types_default(self):
|
||||
"""Default supported_search_types returns [VECTOR]."""
|
||||
class MinimalVectorDB(VectorDatabase):
|
||||
async def add_embeddings(self, collection, ids, embeddings_list, metadatas, documents=None):
|
||||
pass
|
||||
|
||||
async def search(self, collection, query_embedding, k=5, search_type='vector', query_text='', filter=None, vector_weight=None):
|
||||
pass
|
||||
|
||||
async def delete_by_file_id(self, collection, file_id):
|
||||
pass
|
||||
|
||||
async def delete_by_filter(self, collection, filter):
|
||||
pass
|
||||
|
||||
async def get_or_create_collection(self, collection):
|
||||
pass
|
||||
|
||||
async def delete_collection(self, collection):
|
||||
pass
|
||||
|
||||
db = MinimalVectorDB()
|
||||
assert db.supported_search_types() == [SearchType.VECTOR]
|
||||
|
||||
def test_list_by_filter_default_implementation(self):
|
||||
"""list_by_filter has default implementation returning empty."""
|
||||
class MinimalVectorDB(VectorDatabase):
|
||||
async def add_embeddings(self, collection, ids, embeddings_list, metadatas, documents=None):
|
||||
pass
|
||||
|
||||
async def search(self, collection, query_embedding, k=5, search_type='vector', query_text='', filter=None, vector_weight=None):
|
||||
pass
|
||||
|
||||
async def delete_by_file_id(self, collection, file_id):
|
||||
pass
|
||||
|
||||
async def delete_by_filter(self, collection, filter):
|
||||
pass
|
||||
|
||||
async def get_or_create_collection(self, collection):
|
||||
pass
|
||||
|
||||
async def delete_collection(self, collection):
|
||||
pass
|
||||
|
||||
db = MinimalVectorDB()
|
||||
# list_by_filter should return empty list and -1 for total
|
||||
import asyncio
|
||||
result = asyncio.get_event_loop().run_until_complete(
|
||||
db.list_by_filter('test_collection')
|
||||
)
|
||||
assert result == ([], -1)
|
||||
|
||||
|
||||
class TestVectorDatabaseInterface:
|
||||
"""Tests for VectorDatabase interface contracts."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_vector_db(self):
|
||||
"""Create a minimal mock VectorDatabase for testing."""
|
||||
class MockVectorDB(VectorDatabase):
|
||||
def __init__(self):
|
||||
self.add_embeddings = AsyncMock()
|
||||
self.search = AsyncMock(return_value={
|
||||
'ids': [['id1', 'id2']],
|
||||
'distances': [[0.1, 0.2]],
|
||||
'metadatas': [[{'key': 'val1'}, {'key': 'val2'}]]
|
||||
})
|
||||
self.delete_by_file_id = AsyncMock()
|
||||
self.delete_by_filter = AsyncMock(return_value=5)
|
||||
self.get_or_create_collection = AsyncMock()
|
||||
self.delete_collection = AsyncMock()
|
||||
|
||||
async def add_embeddings(self, collection, ids, embeddings_list, metadatas, documents=None):
|
||||
pass
|
||||
|
||||
async def search(self, collection, query_embedding, k=5, search_type='vector', query_text='', filter=None, vector_weight=None):
|
||||
pass
|
||||
|
||||
async def delete_by_file_id(self, collection, file_id):
|
||||
pass
|
||||
|
||||
async def delete_by_filter(self, collection, filter):
|
||||
pass
|
||||
|
||||
async def get_or_create_collection(self, collection):
|
||||
pass
|
||||
|
||||
async def delete_collection(self, collection):
|
||||
pass
|
||||
|
||||
return MockVectorDB()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_embeddings_signature(self, mock_vector_db):
|
||||
"""add_embeddings has expected signature."""
|
||||
await mock_vector_db.add_embeddings(
|
||||
collection='test',
|
||||
ids=['id1', 'id2'],
|
||||
embeddings_list=[[0.1, 0.2], [0.3, 0.4]],
|
||||
metadatas=[{'a': 1}, {'b': 2}],
|
||||
documents=['doc1', 'doc2']
|
||||
)
|
||||
mock_vector_db.add_embeddings.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_search_signature(self, mock_vector_db):
|
||||
"""search has expected signature with all optional params."""
|
||||
import numpy as np
|
||||
|
||||
await mock_vector_db.search(
|
||||
collection='test',
|
||||
query_embedding=np.array([0.1, 0.2]),
|
||||
k=10,
|
||||
search_type='hybrid',
|
||||
query_text='search text',
|
||||
filter={'file_id': 'abc'},
|
||||
vector_weight=0.7
|
||||
)
|
||||
mock_vector_db.search.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_by_filter_returns_int(self, mock_vector_db):
|
||||
"""delete_by_filter returns int count."""
|
||||
result = await mock_vector_db.delete_by_filter('test', {'file_id': 'abc'})
|
||||
assert isinstance(result, int)
|
||||
Reference in New Issue
Block a user