Files
LangBot/tests/unit_tests/rag/test_kbmgr.py
huanghuoguoguo 17bbc8bf10 Feat/test build (#2174)
* fix(ci): update unit-test workflow paths to match current source layout

Replace stale pkg/** filter with src/langbot/** and add uv.lock.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(tests): update README to reflect current test layout

- Fix stale paths: tests/pipeline → tests/unit_tests/pipeline
- Update CI Python versions: 3.11, 3.12, 3.13
- Add test directory structure for box, config, platform, plugin, provider, storage
- Document pytest markers and uv commands
- Mention planned E2E tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add shared test factories package

Create tests/factories/ with reusable test factories:
- FakeApp: mock application with all dependencies
- Message chains: text_chain, mention_chain, image_chain
- Query factories: text_query, group_text_query, command_query, etc.

No test changes - maintains backward compatibility.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake provider factory

Add tests/factories/provider.py with:
- FakeProvider: deterministic fake LLM provider
- Error simulation: timeout, auth, rate-limit, malformed
- Request capture for assertions
- fake_model: mock model with attached provider

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake platform factory

Add tests/factories/platform.py with:
- FakePlatform: simulated platform adapter
- Inbound message construction: friend/group/image
- Mention-bot flag simulation
- Outbound message capture for assertions
- Streaming output support simulation
- Send failure simulation

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add comprehensive message/query factories

Extend tests/factories/message.py with:
- file_query: file attachment query
- unsupported_query: unknown message segment
- voice_query: audio/voice query
- at_all_query: group @All mention
- query_with_session: query with session object
- query_with_config: query with custom pipeline config

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake message flow smoke test

Create tests/smoke/test_fake_message_flow.py:
- TestFakeMessageFlow: factory verification tests
- TestMessageFlowIntegration: minimal flow smoke test
- Tests FakeApp, FakeProvider, FakePlatform, query factories
- Verifies LANGBOT_FAKE_PONG marker response
- Captures outbound messages for assertions

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add developer test-quick command

Add scripts/test-quick.sh and Makefile with:
- test-quick: runs ruff check + unit tests + smoke tests
- No real provider keys or platform accounts required
- Suitable for local branch self-test

Update tests/README.md:
- Document test-quick command
- Document test factories package
- Add smoke tests and factories directory structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(test): make test-quick reliable as developer gate

Fixes for D-001验收问题:
1. test-quick.sh: use set -euo pipefail, uv run ruff, no tail pipe
2. Remove unused imports in factories (app.py, platform.py, provider.py)
3. Fix unused variable in smoke test
4. Add noqa: E402 to test_n8nsvapi.py lazy imports
5. Update smoke test docs: "minimal fake flow" not full pipeline

Now test-quick is a reliable gate: lint failures exit 1, test failures propagate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add preproc and taskmgr unit tests

U-001: Pipeline Preprocessor tests
- Normal text message processing
- Empty message handling
- Image segment with/without vision model
- Model selection and fallback
- Variable extraction

U-004: Core Task Manager tests (pattern-based)
- Task creation and tracking patterns
- Task cancellation patterns
- Scope-based cancellation
- Task type filtering
- Pruning completed tasks
- Wait all tasks

Taskmgr tests use pattern-based approach to avoid circular import
in source code (taskmgr → app → http_controller → migration → taskmgr).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add config loader unit tests

U-005: Config Loader tests
- Valid YAML config loading
- Valid JSON config loading
- Invalid YAML/JSON error behavior
- Missing config file creation from template
- Template completion for missing keys
- ConfigManager load/dump operations
- Exists check for both YAML and JSON

All tests use tmp_path fixture, no real project config.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add chat and command handler pattern tests

U-002: Chat Handler tests (pattern-based)
- Normal message event emission pattern
- prevent_default handling
- User message alteration pattern
- Runner selection pattern
- Streaming/non-streaming response patterns
- Exception handling modes (show-error, show-hint, hide)
- Message history update pattern
- Telemetry payload pattern

U-003: Command Handler tests (pattern-based)
- Command parsing and text extraction
- Event creation pattern
- Privilege/admin check pattern
- Command result handling (text, error, image)
- prevent_default handling
- String truncation helper

Uses pattern-based testing to avoid circular import issues in source code.
Direct imports of handler modules trigger circular import chain.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* style: fix unused imports after ruff auto-fix

Remove unused imports in test files:
- test_config_loader.py: remove unused os
- test_taskmgr.py: remove unused Mock
- test_preproc.py: remove unused unsupported_query, image_chain

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): improve taskmgr tests to test real classes

U-004 improved: Tests now import and test actual classes:
- TaskContext: new(), trace(), to_dict(), placeholder()
- TaskWrapper: task creation, context, exception/result capture, cancel, to_dict
- AsyncTaskManager: create_task, create_user_task, cancel_task, cancel_by_scope
- Task pruning behavior

Uses pre-mocking technique:
- Mock langbot.pkg.core.app before import (breaks circular chain)
- Mock langbot.pkg.core.entities with proper Enum

All 24 tests now test real class behavior, not patterns.
taskmgr.py coverage should improve significantly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(test): consolidate FakeApp and add sys.modules isolation utility

- Extract tests/utils/import_isolation.py with isolated_sys_modules context manager
- Extend tests/factories/app.py FakeApp with handler-specific attributes
- Refactor test_chat_handler.py to use centralized FakeApp and cached imports
- Refactor test_command_handler.py with mock_execute_factory fixture
- Refactor test_smoke.py to move import-time sys.modules manipulation into fixture
- Add SQLite migration integration tests (G-002)
- Add HTTP API smoke integration tests (G-005)
- Update CI workflow to call pytest for SQLite migrations (G-004)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add developer quality gate consolidation (G-007)

- Add scripts/test-integration-fast.sh for fast integration tests
- Add scripts/test-coverage.sh with 12% baseline threshold
- Update Makefile with test-integration-fast, test-coverage, test-all-local
- Update CI workflow with integration and coverage jobs
- Add smoke marker to pytest.ini
- Update tests/README.md with quality gate layers documentation
- Add tests/integration/pipeline/ for pipeline stage-chain tests

Quality gate layers:
- Quick: ruff + unit + smoke (~2 min)
- Fast Integration: SQLite/API/Pipeline (~3 min)
- Coverage: 12% threshold gate (~8 min)
- Full Local: all three combined

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add PostgreSQL migration slow integration tests (G-003)

- Add tests/integration/persistence/test_migrations_postgres.py
- All tests marked with @pytest.mark.slow
- Tests skip when TEST_POSTGRES_URL is not set (no local PostgreSQL)
- Database isolation via clean_tables and clean_alembic_version fixtures
- Update CI workflow to use pytest instead of inline Python script
- Remove TODO(G-003) comment
- Update tests/README.md with PostgreSQL test documentation

Covered scenarios:
- Baseline stamp sets revision
- Upgrade from baseline to head
- Upgrade idempotent
- Get current on unstamped DB returns None

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): Phase 1.5 coverage expansion - COV-001 to COV-013

Coverage baseline raised from 13.65% to 26% (+12.35%)
Gate raised from 12% to 18%

Tasks completed:
- COV-001: Command system unit tests (100% coverage)
- COV-002: API service unit tests batch 1 (user/apikey/model/provider)
- COV-003: Provider model manager unit tests
- COV-004: Pipeline remaining stage tests (aggregator/cntfilter/longtext/msgtrun)
- COV-005: Storage and utils coverage pass
- COV-006: Gate ratchet 12%→15%
- COV-007: Gate ratchet 15%→18%
- COV-008: API service batch 2 (bot/pipeline/webhook/space/maintenance/mcp)
- COV-009: Blocked - API controller circular import issue documented
- COV-010: Plugin runtime unit tests (+0.08%)
- COV-011: RAG and vector unit tests (+0.68%)
- COV-012: Core boot and migration unit tests
- COV-013: Provider requester logic unit tests (+0.62%)

Key additions:
- tests/utils/import_isolation.py: sys.modules isolation for circular imports
- Provider requester mock tests: proved HTTP-dependent code can be tested locally
- Vector filter utilities: 100% coverage on pure functions
- API services: fake persistence pattern for unit testing

Blocked issue COV-009 documented in langbot-test-plan/1.5/issues/

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(phase1): add unit tests for telemetry, plugin, rag, persistence

Add initial unit tests for Phase 1 of test coverage improvement:
- telemetry: test initialization, payload sanitization, early returns (14.3% → 62.9%)
- plugin: test _parse_plugin_id static method
- rag: test _to_i18n_name static method
- persistence: test serialize_model with datetime handling

Overall core coverage: 41.9% → 42.2%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(phase2): add unit tests for core, persistence, plugin, utils

- Add test_handler_helpers.py for plugin handler helpers (7 tests)
- Add test_mgr_methods.py for persistence manager (5 tests)
- Add test_app_config_validation.py for core app config (12 tests)
- Add test_knowledge_service.py for API knowledge service (22 tests)
- Add test_kbmgr.py for RAG knowledge base manager (39 tests)
- Add test_survey_manager.py for survey manager (22 tests)
- Add test_connector_methods.py for plugin connector (24 tests)
- Add test_funcschema.py for utils function schema (9 tests)
- Add test_platform.py for utils platform detection (7 tests)
- Add test_extract_deps.py for plugin deps extraction (7 tests)
- Add test_database_decorator.py for persistence decorator (7 tests)
- Add test_load_config.py for core config loading (19 tests)
- Add COVERAGE_EXCLUSIONS.md documenting external adapter exclusions
- Fix test_chat_session_limit.py path for portability

Coverage: core 28% → 30%, persistence 24% → 24.4%, plugin 27% → 28%
Total: 1082 tests passed, core module coverage 45.5%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add API controller integration tests

- Add test_pipelines.py (10 tests) covering pipelines CRUD operations
  - GET/POST/PUT/DELETE on /api/v1/pipelines
  - Extensions endpoint
  - Metadata endpoint
  - Coverage: pipelines controller 27% → 80%

- Add test_providers.py (10 tests) covering provider/model management
  - Provider CRUD with model counts
  - LLM model CRUD
  - Coverage: providers controller 23% → 81%, models 29% → 45%

Tests use Quart TestClient with mocked services for real HTTP behavior
without external dependencies.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add knowledge, bots, and model endpoints tests

- Add test_knowledge.py (10 tests) covering knowledge base management
  - CRUD operations on /api/v1/knowledge/bases
  - Files management endpoints
  - Retrieve endpoint with validation
  - Coverage: knowledge/base.py 26% → 91%

- Add test_bots.py (9 tests) covering bot management
  - CRUD operations on /api/v1/platform/bots
  - Logs endpoint
  - Send message endpoint with validation
  - Coverage: platform/bots.py 24% → 87%

- Extend test_providers.py (+4 tests) for embedding/rerank models
  - Embedding models CRUD
  - Rerank models CRUD
  - Coverage: provider/models.py 29% → 60%

Total integration tests: 53 (smoke 12 + pipelines 10 + providers 14 + knowledge 10 + bots 9)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add embed and monitoring endpoint tests

Add integration tests for embed widget and monitoring API endpoints:
- test_embed.py: 15 tests for widget.js, logo, turnstile, messages, reset, feedback
- test_monitoring.py: 15 tests for overview, messages, llm-calls, sessions, errors, export

Coverage improvements:
- embed.py: 17% → 56%
- monitoring.py: 17% → 93%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(e2e): add minimal startup E2E tests

Add E2E tests for LangBot startup flow:
- tests/e2e/utils/config_factory.py: minimal config generation
- tests/e2e/utils/process_manager.py: LangBot subprocess management
- tests/e2e/conftest.py: E2E fixtures (session-scoped process)
- tests/e2e/test_startup.py: 12 tests for startup verification

Tests verify:
- boot.py + stages execution
- database initialization (SQLite)
- API availability
- migrations applied

Uses embedded databases (SQLite, Chroma) - no external dependencies.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(quality): fix fake tests and add missing coverage

P0 fixes:
- telemetry: rewrite fake tests with real behavior verification (25 tests)
- config: delete copied-source tests, use proper imports (2 deleted)
- persistence: fix try-except pass to verify specific errors

P1 fixes:
- pipeline: add real FixedWindowAlgo tests instead of mocks (12 tests)
- provider: add SessionManager and ToolManager tests (25 tests)
- storage: add S3StorageProvider tests with moto mock (16 tests)
- plugin: add handler action tests for setting inheritance (15 tests)
- rag: add file storage and ZIP processing tests (21 tests)
- vector: add VDB filter conversion tests (30 tests)

P2 fixes:
- pipeline/msgtrun: strengthen assertions for exact message count
- api: add response structure validation in integration tests

New test files:
- provider/test_session_manager.py
- provider/test_tool_manager.py
- storage/test_s3storage.py
- plugin/test_handler_actions.py
- rag/test_file_storage.py
- vector/test_vdb_filter_conversion.py

Source code bugs documented:
- provider: TokenManager.next_token() ZeroDivisionError
- telemetry: send_tasks class variable shared state
- command: empty command IndexError, unused parameters
- utils: funcschema KeyError
- entity: vector.py independent declarative_base

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(test): update coverage stats and test structure

- Update coverage from 22% to 30%
- Add new test files to structure:
  - provider: session_manager, tool_manager
  - storage: s3storage
  - plugin: handler_actions
  - rag: file_storage
  - vector: vdb_filter_conversion
  - telemetry: rewritten tests
- Update module coverage percentages

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test: add 105 new unit tests for untested core functionality

Add comprehensive tests for B-class issues (core functionality untested):

Pipeline:
- test_pool.py: QueryPool ID generation, caching, async context (12 tests)
- test_ratelimit.py: Fixed timing-sensitive test tolerance
- test_pipelinemgr.py: Use real Pydantic StageProcessResult instead of Mock

Utils:
- test_version.py: Version comparison functions (20 tests)
- test_logcache.py: Log page management and retrieval (18 tests)
- test_httpclient.py: HTTP session pool management (10 tests)
- test_proxy.py: Proxy configuration from env and config (10 tests)
- test_image.py: URL parsing and base64 extraction (12 tests)
- test_pkgmgr.py: Pip command generation (8 tests)

Discover:
- test_engine.py: I18nString, Metadata, Component manifest (15 tests)

Test count: 1193 → 1298 (+105 tests)

Note: Some B-class issues cannot be tested due to circular import bugs
filed as GitHub issues #2175 (pipeline) and #2176 (persistence).

* test: tighten phase 1 coverage contracts

* test: align ci integration isolation

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 12:05:54 +08:00

794 lines
27 KiB
Python

"""Unit tests for RAG knowledge base manager.
Tests cover:
- RAGManager CRUD operations
- RuntimeKnowledgeBase getters
- Knowledge engine enrichment
- KB loading and removal
"""
from __future__ import annotations
import pytest
import uuid
from unittest.mock import Mock, AsyncMock
from importlib import import_module
def get_rag_module():
"""Lazy import to avoid circular import issues."""
return import_module('langbot.pkg.rag.knowledge.kbmgr')
def create_mock_app():
"""Create mock Application for testing."""
mock_app = Mock()
mock_app.logger = Mock()
mock_app.persistence_mgr = AsyncMock()
mock_app.persistence_mgr.execute_async = AsyncMock()
mock_app.persistence_mgr.serialize_model = Mock(return_value={})
mock_app.plugin_connector = AsyncMock()
mock_app.plugin_connector.is_enable_plugin = True
mock_app.storage_mgr = Mock()
mock_app.storage_mgr.storage_provider = AsyncMock()
mock_app.task_mgr = AsyncMock()
mock_app.task_mgr.create_user_task = Mock(return_value=Mock(id=1))
return mock_app
def create_mock_kb_entity():
"""Create mock KnowledgeBase entity."""
mock_kb = Mock()
mock_kb.uuid = str(uuid.uuid4())
mock_kb.name = 'Test KB'
mock_kb.description = 'Test description'
mock_kb.knowledge_engine_plugin_id = 'author/engine'
mock_kb.collection_id = mock_kb.uuid
mock_kb.creation_settings = {}
mock_kb.retrieval_settings = {}
return mock_kb
class TestRAGManagerCreateKnowledgeBase:
"""Tests for create_knowledge_base method."""
@pytest.mark.asyncio
async def test_creates_kb_with_valid_engine(self):
"""Test creates KB when engine plugin exists."""
rag_module = get_rag_module()
mock_app = create_mock_app()
# Mock valid engine list
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(
return_value=[{'plugin_id': 'author/engine', 'name': 'Engine'}]
)
mock_app.persistence_mgr.execute_async = AsyncMock()
mock_app.plugin_connector.rag_on_kb_create = AsyncMock()
manager = rag_module.RAGManager(mock_app)
kb = await manager.create_knowledge_base(
name='Test KB',
knowledge_engine_plugin_id='author/engine',
creation_settings={'model': 'test'},
)
assert kb.name == 'Test KB'
assert kb.knowledge_engine_plugin_id == 'author/engine'
@pytest.mark.asyncio
async def test_raises_when_engine_not_found(self):
"""Test raises ValueError when engine plugin not found."""
rag_module = get_rag_module()
mock_app = create_mock_app()
# Mock empty engine list
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(return_value=[])
manager = rag_module.RAGManager(mock_app)
with pytest.raises(ValueError) as exc_info:
await manager.create_knowledge_base(
name='Test KB',
knowledge_engine_plugin_id='unknown/engine',
creation_settings={},
)
assert 'not found' in str(exc_info.value)
@pytest.mark.asyncio
async def test_rollback_on_plugin_create_failure(self):
"""Test that DB entry is rolled back when plugin create fails."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(
return_value=[{'plugin_id': 'author/engine'}]
)
mock_app.persistence_mgr.execute_async = AsyncMock()
mock_app.plugin_connector.rag_on_kb_create = AsyncMock(
side_effect=Exception('Plugin error')
)
manager = rag_module.RAGManager(mock_app)
with pytest.raises(Exception):
await manager.create_knowledge_base(
name='Test KB',
knowledge_engine_plugin_id='author/engine',
creation_settings={},
)
# Should have called delete to rollback
# Check that delete was called (for rollback)
assert len(manager.knowledge_bases) == 0
@pytest.mark.asyncio
async def test_sets_default_retrieval_settings(self):
"""Test that empty retrieval_settings defaults to {}."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(
return_value=[{'plugin_id': 'author/engine'}]
)
mock_app.persistence_mgr.execute_async = AsyncMock()
mock_app.plugin_connector.rag_on_kb_create = AsyncMock()
manager = rag_module.RAGManager(mock_app)
kb = await manager.create_knowledge_base(
name='Test KB',
knowledge_engine_plugin_id='author/engine',
creation_settings={},
retrieval_settings=None,
)
assert kb.retrieval_settings == {}
@pytest.mark.asyncio
async def test_skips_validation_when_plugin_disabled(self):
"""Test that engine validation is skipped when plugin disabled."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.plugin_connector.is_enable_plugin = False
mock_app.persistence_mgr.execute_async = AsyncMock()
mock_app.plugin_connector.rag_on_kb_create = AsyncMock()
manager = rag_module.RAGManager(mock_app)
# Should not raise even though engine list would be empty
kb = await manager.create_knowledge_base(
name='Test KB',
knowledge_engine_plugin_id='any/engine',
creation_settings={},
)
assert kb.knowledge_engine_plugin_id == 'any/engine'
class TestRuntimeKnowledgeBaseOnKBCreate:
"""Tests for _on_kb_create method."""
@pytest.mark.asyncio
async def test_calls_plugin_on_create(self):
"""Test that plugin is notified on KB create."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.creation_settings = {'model': 'test'}
mock_app.plugin_connector.rag_on_kb_create = AsyncMock()
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb._on_kb_create()
mock_app.plugin_connector.rag_on_kb_create.assert_called_once_with(
'author/engine', mock_kb.uuid, {'model': 'test'}
)
@pytest.mark.asyncio
async def test_skips_when_no_plugin_id(self):
"""Test that create notification is skipped when no plugin."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.knowledge_engine_plugin_id = None
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb._on_kb_create()
mock_app.plugin_connector.rag_on_kb_create.assert_not_called()
@pytest.mark.asyncio
async def test_raises_on_plugin_error(self):
"""Test that exception is raised when plugin fails."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.rag_on_kb_create = AsyncMock(
side_effect=Exception('Plugin failed')
)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
with pytest.raises(Exception):
await runtime_kb._on_kb_create()
class TestRuntimeKnowledgeBaseDeleteFile:
"""Tests for delete_file method."""
@pytest.mark.asyncio
async def test_delete_file_calls_plugin_and_db(self):
"""Test that delete_file calls plugin and removes DB record."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.call_rag_delete_document = AsyncMock(return_value=True)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb.delete_file('file-uuid')
mock_app.plugin_connector.call_rag_delete_document.assert_called_once()
mock_app.persistence_mgr.execute_async.assert_called()
class TestRuntimeKnowledgeBaseIngestDocument:
"""Tests for _ingest_document method."""
@pytest.mark.asyncio
async def test_ingest_calls_plugin(self):
"""Test that ingest calls plugin connector."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.call_rag_ingest = AsyncMock(
return_value={'status': 'success'}
)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
result = await runtime_kb._ingest_document(
{'filename': 'test.pdf'},
'storage/path',
)
assert result['status'] == 'success'
mock_app.plugin_connector.call_rag_ingest.assert_called_once()
@pytest.mark.asyncio
async def test_ingest_raises_when_no_plugin_id(self):
"""Test that ValueError is raised when no plugin ID."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.knowledge_engine_plugin_id = None
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
with pytest.raises(ValueError) as exc_info:
await runtime_kb._ingest_document({'filename': 'test.pdf'}, 'path')
assert 'Plugin ID required' in str(exc_info.value)
class TestRAGManagerLoadKnowledgeBasesFromDB:
"""Tests for load_knowledge_bases_from_db method."""
@pytest.mark.asyncio
async def test_loads_all_kbs_from_db(self):
"""Test that all KBs are loaded from database."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb1 = create_mock_kb_entity()
mock_kb2 = create_mock_kb_entity()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(all=Mock(return_value=[mock_kb1, mock_kb2]))
)
manager = rag_module.RAGManager(mock_app)
await manager.load_knowledge_bases_from_db()
assert len(manager.knowledge_bases) == 2
@pytest.mark.asyncio
async def test_handles_load_error_gracefully(self):
"""Test that load errors are logged but not raised."""
rag_module = get_rag_module()
mock_app = create_mock_app()
# KB that will cause initialize to fail
mock_kb = create_mock_kb_entity()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(all=Mock(return_value=[mock_kb]))
)
# Make initialize fail by having plugin_connector throw error
mock_app.plugin_connector.rag_on_kb_create = AsyncMock(
side_effect=Exception('Init failed')
)
manager = rag_module.RAGManager(mock_app)
# Should not raise - errors are caught
await manager.load_knowledge_bases_from_db()
# KB should still be loaded (initialize just passes)
# The error would come from runtime_kb.initialize which we can't easily mock
# So we just verify it doesn't crash
class TestRuntimeKnowledgeBaseGetters:
"""Tests for RuntimeKnowledgeBase getter methods."""
def test_get_uuid_returns_entity_uuid(self):
"""Test get_uuid returns KB entity UUID."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
assert runtime_kb.get_uuid() == mock_kb.uuid
def test_get_name_returns_entity_name(self):
"""Test get_name returns KB entity name."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
assert runtime_kb.get_name() == mock_kb.name
def test_get_knowledge_engine_plugin_id_returns_plugin_id(self):
"""Test get_knowledge_engine_plugin_id returns plugin ID."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
assert runtime_kb.get_knowledge_engine_plugin_id() == 'author/engine'
def test_get_knowledge_engine_plugin_id_returns_empty_when_none(self):
"""Test returns empty string when plugin_id is None."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.knowledge_engine_plugin_id = None
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
assert runtime_kb.get_knowledge_engine_plugin_id() == ''
class TestRuntimeKnowledgeBaseRetrieve:
"""Tests for RuntimeKnowledgeBase retrieve method."""
@pytest.mark.asyncio
async def test_retrieve_merges_settings(self):
"""Test that retrieve merges stored and request settings."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.retrieval_settings = {'top_k': 10, 'model': 'default'}
# Mock plugin connector response with valid RetrievalResultEntry fields
# content must be list of ContentElement dicts
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
return_value={
'results': [
{
'id': 'doc1',
'content': [{'type': 'text', 'text': 'test content'}],
'metadata': {},
'distance': 0.1,
}
]
}
)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
# Override top_k in request
results = await runtime_kb.retrieve('query text', settings={'top_k': 20})
assert len(results) == 1
# Check that merged settings were passed (top_k overridden)
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
assert call_args[0][1]['retrieval_settings']['top_k'] == 20
@pytest.mark.asyncio
async def test_retrieve_adds_default_top_k(self):
"""Test that default top_k=5 is added when not specified."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.retrieval_settings = {}
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
return_value={'results': []}
)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb.retrieve('query text')
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
assert call_args[0][1]['retrieval_settings']['top_k'] == 5
@pytest.mark.asyncio
async def test_retrieve_converts_dict_to_entry(self):
"""Test that dict results are converted to RetrievalResultEntry."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
# Mock response with valid RetrievalResultEntry fields
# content must be list of ContentElement dicts
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
return_value={
'results': [
{
'id': 'doc1',
'content': [{'type': 'text', 'text': 'test content'}],
'metadata': {'source': 'file.pdf'},
'distance': 0.15,
}
]
}
)
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
results = await runtime_kb.retrieve('query')
assert len(results) == 1
# Result should be RetrievalResultEntry
assert hasattr(results[0], 'content')
assert results[0].id == 'doc1'
class TestRuntimeKnowledgeBaseDispose:
"""Tests for RuntimeKnowledgeBase dispose method."""
@pytest.mark.asyncio
async def test_dispose_calls_on_kb_delete(self):
"""Test that dispose calls _on_kb_delete."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.rag_on_kb_delete = AsyncMock()
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb.dispose()
mock_app.plugin_connector.rag_on_kb_delete.assert_called_once()
@pytest.mark.asyncio
async def test_dispose_skips_when_no_plugin_id(self):
"""Test that dispose skips when no plugin ID."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb = create_mock_kb_entity()
mock_kb.knowledge_engine_plugin_id = None
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb.dispose()
# Should not call plugin connector
mock_app.plugin_connector.rag_on_kb_delete.assert_not_called()
class TestRAGManagerInit:
"""Tests for RAGManager initialization."""
def test_init_stores_app_reference(self):
"""Test that __init__ stores Application reference."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
assert manager.ap is mock_app
def test_init_creates_empty_knowledge_bases_dict(self):
"""Test that knowledge_bases starts as empty dict."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
assert manager.knowledge_bases == {}
class TestRAGManagerGetKnowledgeBase:
"""Tests for RAGManager get methods."""
@pytest.mark.asyncio
async def test_get_knowledge_base_by_uuid_returns_runtime_kb(self):
"""Test get_knowledge_base_by_uuid returns loaded KB."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
mock_kb = create_mock_kb_entity()
# Manually add to knowledge_bases
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
manager.knowledge_bases[mock_kb.uuid] = runtime_kb
result = await manager.get_knowledge_base_by_uuid(mock_kb.uuid)
assert result is runtime_kb
@pytest.mark.asyncio
async def test_get_knowledge_base_by_uuid_returns_none_when_not_found(self):
"""Test returns None when KB not in runtime."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
result = await manager.get_knowledge_base_by_uuid('nonexistent-uuid')
assert result is None
@pytest.mark.asyncio
async def test_remove_knowledge_base_from_runtime(self):
"""Test remove_knowledge_base_from_runtime removes KB."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
mock_kb = create_mock_kb_entity()
# Add to knowledge_bases
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
manager.knowledge_bases[mock_kb.uuid] = runtime_kb
await manager.remove_knowledge_base_from_runtime(mock_kb.uuid)
assert mock_kb.uuid not in manager.knowledge_bases
class TestRAGManagerEnrichKB:
"""Tests for _enrich_kb_dict method."""
def test_enrich_adds_engine_info_from_map(self):
"""Test that engine info is added from engine_map."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
kb_dict = {'knowledge_engine_plugin_id': 'author/engine'}
engine_map = {
'author/engine': {
'plugin_id': 'author/engine',
'name': 'Test Engine',
'capabilities': ['doc_ingestion', 'search'],
}
}
manager._enrich_kb_dict(kb_dict, engine_map)
assert 'knowledge_engine' in kb_dict
assert kb_dict['knowledge_engine']['plugin_id'] == 'author/engine'
assert kb_dict['knowledge_engine']['capabilities'] == ['doc_ingestion', 'search']
def test_enrich_uses_fallback_when_engine_not_in_map(self):
"""Test that fallback info is used when engine not found."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
kb_dict = {'knowledge_engine_plugin_id': 'unknown/engine'}
engine_map = {}
manager._enrich_kb_dict(kb_dict, engine_map)
assert 'knowledge_engine' in kb_dict
assert kb_dict['knowledge_engine']['plugin_id'] == 'unknown/engine'
assert kb_dict['knowledge_engine']['capabilities'] == []
def test_enrich_uses_fallback_when_no_plugin_id(self):
"""Test that fallback is used when no plugin ID."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
kb_dict = {}
engine_map = {}
manager._enrich_kb_dict(kb_dict, engine_map)
assert 'knowledge_engine' in kb_dict
# Should have Internal (Legacy) name
assert 'en_US' in kb_dict['knowledge_engine']['name']
def test_enrich_converts_string_name_to_i18n(self):
"""Test that engine name is converted to i18n dict."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
kb_dict = {'knowledge_engine_plugin_id': 'author/engine'}
engine_map = {
'author/engine': {
'plugin_id': 'author/engine',
'name': 'Simple Name', # String, not dict
'capabilities': [],
}
}
manager._enrich_kb_dict(kb_dict, engine_map)
# Name should be converted to i18n dict
engine_name = kb_dict['knowledge_engine']['name']
assert isinstance(engine_name, dict)
assert engine_name['en_US'] == 'Simple Name'
class TestRAGManagerDeleteKnowledgeBase:
"""Tests for delete_knowledge_base method."""
@pytest.mark.asyncio
async def test_delete_removes_from_runtime_and_disposes(self):
"""Test that delete removes KB and calls dispose."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
mock_kb = create_mock_kb_entity()
# Add to knowledge_bases
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
manager.knowledge_bases[mock_kb.uuid] = runtime_kb
await manager.delete_knowledge_base(mock_kb.uuid)
assert mock_kb.uuid not in manager.knowledge_bases
@pytest.mark.asyncio
async def test_delete_logs_warning_when_not_in_runtime(self):
"""Test that warning is logged when KB not in runtime."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
await manager.delete_knowledge_base('nonexistent-uuid')
mock_app.logger.warning.assert_called_once()
class TestRAGManagerGetAllDetails:
"""Tests for get_all_knowledge_base_details method."""
@pytest.mark.asyncio
async def test_returns_empty_list_when_no_kbs(self):
"""Test returns empty list when no knowledge bases."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(all=Mock(return_value=[]))
)
manager = rag_module.RAGManager(mock_app)
result = await manager.get_all_knowledge_base_details()
assert result == []
@pytest.mark.asyncio
async def test_enriches_each_kb_with_engine_info(self):
"""Test that each KB is enriched with engine info."""
rag_module = get_rag_module()
mock_app = create_mock_app()
# Mock DB result
mock_kb_row = Mock()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(all=Mock(return_value=[mock_kb_row]))
)
mock_app.persistence_mgr.serialize_model = Mock(
return_value={'uuid': 'kb1', 'knowledge_engine_plugin_id': 'author/engine'}
)
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(
return_value=[{'plugin_id': 'author/engine', 'name': 'Engine', 'capabilities': ['search']}]
)
manager = rag_module.RAGManager(mock_app)
result = await manager.get_all_knowledge_base_details()
assert len(result) == 1
assert 'knowledge_engine' in result[0]
class TestRAGManagerGetDetails:
"""Tests for get_knowledge_base_details method."""
@pytest.mark.asyncio
async def test_returns_none_when_kb_not_found(self):
"""Test returns None when KB doesn't exist."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(first=Mock(return_value=None))
)
manager = rag_module.RAGManager(mock_app)
result = await manager.get_knowledge_base_details('nonexistent')
assert result is None
@pytest.mark.asyncio
async def test_returns_enriched_kb_dict(self):
"""Test returns enriched KB dict when found."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_kb_row = Mock()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(first=Mock(return_value=mock_kb_row))
)
mock_app.persistence_mgr.serialize_model = Mock(
return_value={'uuid': 'kb1', 'knowledge_engine_plugin_id': 'author/engine'}
)
mock_app.plugin_connector.list_knowledge_engines = AsyncMock(
return_value=[{'plugin_id': 'author/engine', 'name': 'Engine', 'capabilities': []}]
)
manager = rag_module.RAGManager(mock_app)
result = await manager.get_knowledge_base_details('kb1')
assert result is not None
assert 'knowledge_engine' in result
class TestRAGManagerLoadKnowledgeBase:
"""Tests for load_knowledge_base method."""
@pytest.mark.asyncio
async def test_loads_kb_entity_into_runtime(self):
"""Test that KB entity is loaded into runtime."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
mock_kb = create_mock_kb_entity()
result = await manager.load_knowledge_base(mock_kb)
assert mock_kb.uuid in manager.knowledge_bases
assert result.get_uuid() == mock_kb.uuid
@pytest.mark.asyncio
async def test_load_handles_dict_entity(self):
"""Test that dict entity is converted to KB object."""
rag_module = get_rag_module()
mock_app = create_mock_app()
manager = rag_module.RAGManager(mock_app)
kb_dict = {
'uuid': 'kb-uuid',
'name': 'Test',
'knowledge_engine_plugin_id': 'author/engine',
'knowledge_engine': {'name': 'should_be_filtered'}, # non-db field
}
await manager.load_knowledge_base(kb_dict)
assert 'kb-uuid' in manager.knowledge_bases