mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-13 09:16:04 +00:00
test: tighten phase 1 coverage contracts
This commit is contained in:
@@ -1,410 +1,190 @@
|
||||
"""Unit tests for RuntimeKnowledgeBase file storage and ZIP processing.
|
||||
"""Unit tests for RuntimeKnowledgeBase file storage behavior."""
|
||||
|
||||
Tests cover:
|
||||
- store_file entry point
|
||||
- _store_file_task background processing
|
||||
- _store_zip_file ZIP extraction
|
||||
- File status management (pending -> processing -> completed/failed)
|
||||
- MIME type detection
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import io
|
||||
import zipfile
|
||||
import tempfile
|
||||
import os
|
||||
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
||||
from importlib import import_module
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from langbot.pkg.rag.knowledge.kbmgr import RuntimeKnowledgeBase
|
||||
|
||||
|
||||
def get_kbmgr_module():
|
||||
"""Lazy import to avoid circular import issues."""
|
||||
return import_module('langbot.pkg.rag.knowledge.kbmgr')
|
||||
def _make_zip_bytes(entries: dict[str, bytes]) -> bytes:
|
||||
buffer = io.BytesIO()
|
||||
with zipfile.ZipFile(buffer, 'w') as zf:
|
||||
for name, content in entries.items():
|
||||
zf.writestr(name, content)
|
||||
zf.mkdir('emptydir')
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _make_app() -> Mock:
|
||||
app = Mock()
|
||||
app.logger = Mock()
|
||||
app.task_mgr = Mock()
|
||||
app.storage_mgr = Mock()
|
||||
app.storage_mgr.storage_provider = Mock()
|
||||
app.storage_mgr.storage_provider.exists = AsyncMock(return_value=True)
|
||||
app.storage_mgr.storage_provider.load = AsyncMock()
|
||||
app.storage_mgr.storage_provider.save = AsyncMock()
|
||||
app.storage_mgr.storage_provider.size = AsyncMock(return_value=123)
|
||||
app.storage_mgr.storage_provider.delete = AsyncMock()
|
||||
app.persistence_mgr = Mock()
|
||||
app.persistence_mgr.execute_async = AsyncMock()
|
||||
app.plugin_connector = Mock()
|
||||
return app
|
||||
|
||||
|
||||
def _make_kb(plugin_id: str | None = 'author/engine') -> RuntimeKnowledgeBase:
|
||||
kb_entity = Mock()
|
||||
kb_entity.uuid = 'test-kb-uuid'
|
||||
kb_entity.collection_id = 'test-collection'
|
||||
kb_entity.creation_settings = {}
|
||||
kb_entity.knowledge_engine_plugin_id = plugin_id
|
||||
return RuntimeKnowledgeBase(_make_app(), kb_entity)
|
||||
|
||||
|
||||
class TestStoreFile:
|
||||
"""Tests for store_file method - entry point for file storage."""
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_file_creates_pending_record_and_user_task(self):
|
||||
kb = _make_kb()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_kb(self):
|
||||
"""Create mock RuntimeKnowledgeBase."""
|
||||
kbmgr = get_kbmgr_module()
|
||||
def create_user_task(coro, **kwargs):
|
||||
coro.close()
|
||||
return SimpleNamespace(id='task-1', kwargs=kwargs)
|
||||
|
||||
mock_app = Mock()
|
||||
mock_app.logger = Mock()
|
||||
mock_app.task_mgr = Mock()
|
||||
mock_app.task_mgr.create_user_task = Mock(return_value=Mock(id=1))
|
||||
mock_app.storage_mgr = Mock()
|
||||
mock_app.storage_mgr.storage_provider = Mock()
|
||||
mock_app.storage_mgr.storage_provider.exists = AsyncMock(return_value=True)
|
||||
mock_app.persistence_mgr = Mock()
|
||||
mock_app.persistence_mgr.execute_async = AsyncMock()
|
||||
kb.ap.task_mgr.create_user_task = Mock(side_effect=create_user_task)
|
||||
|
||||
mock_kb_entity = Mock()
|
||||
mock_kb_entity.uuid = 'test-kb-uuid'
|
||||
task_id = await kb.store_file('documents/test.pdf')
|
||||
|
||||
kb = kbmgr.RuntimeKnowledgeBase(mock_app, mock_kb_entity)
|
||||
kb._on_kb_create = AsyncMock()
|
||||
return kb
|
||||
assert task_id == 'task-1'
|
||||
kb.ap.storage_mgr.storage_provider.exists.assert_awaited_once_with('documents/test.pdf')
|
||||
kb.ap.persistence_mgr.execute_async.assert_awaited_once()
|
||||
call_kwargs = kb.ap.task_mgr.create_user_task.call_args.kwargs
|
||||
assert call_kwargs['kind'] == 'knowledge-operation'
|
||||
assert call_kwargs['name'] == 'knowledge-store-file-documents/test.pdf'
|
||||
assert call_kwargs['label'] == 'Store file documents/test.pdf'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_creates_pending_file_record(self, mock_kb):
|
||||
"""Test that store_file creates a pending file record."""
|
||||
# Mock persistence for file record creation
|
||||
mock_result = Mock()
|
||||
mock_result.first = Mock(return_value=None)
|
||||
mock_kb.ap.persistence_mgr.execute_async.return_value = mock_result
|
||||
async def test_store_file_raises_when_source_file_missing(self):
|
||||
kb = _make_kb()
|
||||
kb.ap.storage_mgr.storage_provider.exists = AsyncMock(return_value=False)
|
||||
|
||||
# Mock file exists in storage
|
||||
mock_kb.ap.storage_mgr.storage_provider.exists = AsyncMock(return_value=True)
|
||||
with pytest.raises(Exception, match='File missing.pdf not found'):
|
||||
await kb.store_file('missing.pdf')
|
||||
|
||||
# We can't directly test store_file without full setup
|
||||
# But we verify the expected behavior pattern
|
||||
file_name = 'test.pdf'
|
||||
storage_path = 'kb/test-kb-uuid/test.pdf'
|
||||
mime_type = 'application/pdf'
|
||||
|
||||
# Verify storage provider would be called
|
||||
assert mock_kb.ap.storage_mgr.storage_provider is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_early_when_file_not_exists(self, mock_kb):
|
||||
"""Test that store_file returns early when file doesn't exist in storage."""
|
||||
mock_kb.ap.storage_mgr.storage_provider.exists = AsyncMock(return_value=False)
|
||||
|
||||
storage_path = 'kb/test-kb-uuid/nonexistent.pdf'
|
||||
|
||||
# Should check existence before proceeding
|
||||
exists = await mock_kb.ap.storage_mgr.storage_provider.exists(storage_path)
|
||||
assert exists is False
|
||||
kb.ap.persistence_mgr.execute_async.assert_not_awaited()
|
||||
kb.ap.task_mgr.create_user_task.assert_not_called()
|
||||
|
||||
|
||||
class TestStoreZipFile:
|
||||
"""Tests for _store_zip_file method - ZIP extraction and processing."""
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_zip_file_extracts_supported_files_and_skips_noise(self):
|
||||
kb = _make_kb()
|
||||
kb.ap.storage_mgr.storage_provider.load = AsyncMock(
|
||||
return_value=_make_zip_bytes(
|
||||
{
|
||||
'doc1.pdf': b'pdf',
|
||||
'doc2.txt': b'text',
|
||||
'subdir/doc3.md': b'markdown',
|
||||
'page.html': b'html',
|
||||
'image.png': b'png',
|
||||
'.hidden': b'hidden',
|
||||
'__MACOSX/doc1.pdf': b'metadata',
|
||||
}
|
||||
)
|
||||
)
|
||||
kb.store_file = AsyncMock(side_effect=['task-pdf', 'task-txt', 'task-md', 'task-html'])
|
||||
|
||||
@pytest.fixture
|
||||
def temp_zip_with_files(self):
|
||||
"""Create a temporary ZIP file with multiple supported files."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
|
||||
with zipfile.ZipFile(tmp, 'w') as zf:
|
||||
# Add supported files
|
||||
zf.writestr('doc1.pdf', b'PDF content 1')
|
||||
zf.writestr('doc2.txt', b'Text content')
|
||||
zf.writestr('subdir/doc3.md', b'Markdown content')
|
||||
# Add unsupported file
|
||||
zf.writestr('image.png', b'PNG binary')
|
||||
# Add hidden file (should be skipped)
|
||||
zf.writestr('.hidden', b'hidden content')
|
||||
# Add __MACOSX file (should be skipped)
|
||||
zf.writestr('__MACOSX/doc1.pdf', b'macos metadata')
|
||||
# Add directory entry
|
||||
zf.mkdir('emptydir')
|
||||
yield tmp.name
|
||||
os.unlink(tmp.name)
|
||||
task_id = await kb._store_zip_file('archive.zip', parser_plugin_id='parser/plugin')
|
||||
|
||||
@pytest.fixture
|
||||
def temp_zip_with_no_supported(self):
|
||||
"""Create a ZIP with no supported file types."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
|
||||
with zipfile.ZipFile(tmp, 'w') as zf:
|
||||
zf.writestr('image.jpg', b'JPEG content')
|
||||
zf.writestr('video.mp4', b'video content')
|
||||
yield tmp.name
|
||||
os.unlink(tmp.name)
|
||||
|
||||
@pytest.fixture
|
||||
def temp_empty_zip(self):
|
||||
"""Create an empty ZIP file."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
|
||||
with zipfile.ZipFile(tmp, 'w') as zf:
|
||||
pass # Empty
|
||||
yield tmp.name
|
||||
os.unlink(tmp.name)
|
||||
|
||||
def test_zip_extraction_identifies_supported_files(self, temp_zip_with_files):
|
||||
"""Test that ZIP extraction identifies supported file types."""
|
||||
# Supported extensions based on source code
|
||||
supported_extensions = ['.pdf', '.txt', '.md', '.doc', '.docx']
|
||||
|
||||
with zipfile.ZipFile(temp_zip_with_files, 'r') as zf:
|
||||
supported_files = []
|
||||
for info in zf.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
name = info.filename
|
||||
# Skip hidden files
|
||||
if name.startswith('.') or '/.' in name:
|
||||
continue
|
||||
# Skip __MACOSX
|
||||
if '__MACOSX' in name:
|
||||
continue
|
||||
# Check extension
|
||||
ext = os.path.splitext(name)[1].lower()
|
||||
if ext in supported_extensions:
|
||||
supported_files.append(name)
|
||||
|
||||
assert 'doc1.pdf' in supported_files
|
||||
assert 'doc2.txt' in supported_files
|
||||
assert 'subdir/doc3.md' in supported_files
|
||||
assert 'image.png' not in supported_files
|
||||
assert '.hidden' not in supported_files
|
||||
assert '__MACOSX/doc1.pdf' not in supported_files
|
||||
|
||||
def test_skips_directory_entries(self, temp_zip_with_files):
|
||||
"""Test that directory entries are skipped."""
|
||||
with zipfile.ZipFile(temp_zip_with_files, 'r') as zf:
|
||||
for info in zf.infolist():
|
||||
if info.is_dir():
|
||||
# Directory should be skipped - ZIP directories have trailing slash
|
||||
assert info.filename.rstrip('/') == 'emptydir'
|
||||
|
||||
def test_skips_hidden_files(self, temp_zip_with_files):
|
||||
"""Test that hidden files (starting with .) are skipped."""
|
||||
with zipfile.ZipFile(temp_zip_with_files, 'r') as zf:
|
||||
hidden_files = []
|
||||
for info in zf.infolist():
|
||||
if not info.is_dir():
|
||||
name = info.filename
|
||||
if name.startswith('.') or '/.' in name:
|
||||
hidden_files.append(name)
|
||||
|
||||
# Hidden files exist in ZIP but should be filtered
|
||||
assert '.hidden' in hidden_files
|
||||
|
||||
def test_skips_macos_metadata(self, temp_zip_with_files):
|
||||
"""Test that __MACOSX files are skipped."""
|
||||
with zipfile.ZipFile(temp_zip_with_files, 'r') as zf:
|
||||
macos_files = []
|
||||
for info in zf.infolist():
|
||||
if not info.is_dir():
|
||||
if '__MACOSX' in info.filename:
|
||||
macos_files.append(info.filename)
|
||||
|
||||
assert '__MACOSX/doc1.pdf' in macos_files
|
||||
|
||||
def test_raises_when_no_supported_files(self, temp_zip_with_no_supported):
|
||||
"""Test that ValueError is raised when no supported files found."""
|
||||
supported_extensions = ['.pdf', '.txt', '.md', '.doc', '.docx']
|
||||
|
||||
with zipfile.ZipFile(temp_zip_with_no_supported, 'r') as zf:
|
||||
supported_files = []
|
||||
for info in zf.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
ext = os.path.splitext(info.filename)[1].lower()
|
||||
if ext in supported_extensions:
|
||||
supported_files.append(info.filename)
|
||||
|
||||
assert len(supported_files) == 0
|
||||
# Source code raises ValueError in this case
|
||||
|
||||
def test_handles_empty_zip(self, temp_empty_zip):
|
||||
"""Test handling of empty ZIP file."""
|
||||
with zipfile.ZipFile(temp_empty_zip, 'r') as zf:
|
||||
files = [info for info in zf.infolist() if not info.is_dir()]
|
||||
assert len(files) == 0
|
||||
|
||||
|
||||
class TestFileStatusManagement:
|
||||
"""Tests for file status transitions during storage."""
|
||||
assert task_id == 'task-pdf'
|
||||
assert kb.ap.storage_mgr.storage_provider.save.await_count == 4
|
||||
saved_names = [call.args[0] for call in kb.ap.storage_mgr.storage_provider.save.await_args_list]
|
||||
assert any(name.startswith('doc1_') and name.endswith('.pdf') for name in saved_names)
|
||||
assert any(name.startswith('doc2_') and name.endswith('.txt') for name in saved_names)
|
||||
assert any(name.startswith('subdir_doc3_') and name.endswith('.md') for name in saved_names)
|
||||
assert any(name.startswith('page_') and name.endswith('.html') for name in saved_names)
|
||||
assert not any('image' in name for name in saved_names)
|
||||
assert not any('hidden' in name for name in saved_names)
|
||||
assert not any('__MACOSX' in name for name in saved_names)
|
||||
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('archive.zip')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_transitions_to_processing(self):
|
||||
"""Test that file status transitions from pending to processing."""
|
||||
# Status values from source code
|
||||
STATUS_PENDING = 'pending'
|
||||
STATUS_PROCESSING = 'processing'
|
||||
STATUS_COMPLETED = 'completed'
|
||||
STATUS_FAILED = 'failed'
|
||||
async def test_store_zip_file_raises_when_no_supported_files(self):
|
||||
kb = _make_kb()
|
||||
kb.ap.storage_mgr.storage_provider.load = AsyncMock(
|
||||
return_value=_make_zip_bytes({'image.png': b'png', 'video.mp4': b'video'})
|
||||
)
|
||||
kb.store_file = AsyncMock()
|
||||
|
||||
# Simulate status transitions
|
||||
initial_status = STATUS_PENDING
|
||||
after_process_start = STATUS_PROCESSING
|
||||
after_success = STATUS_COMPLETED
|
||||
with pytest.raises(Exception, match='No supported files found'):
|
||||
await kb._store_zip_file('archive.zip')
|
||||
|
||||
assert initial_status == 'pending'
|
||||
assert after_process_start == 'processing'
|
||||
assert after_success == 'completed'
|
||||
kb.store_file.assert_not_awaited()
|
||||
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('archive.zip')
|
||||
|
||||
|
||||
class TestStoreFileTask:
|
||||
@pytest.mark.asyncio
|
||||
async def test_store_file_task_marks_completed_and_cleans_storage(self):
|
||||
kb = _make_kb()
|
||||
kb._ingest_document = AsyncMock(return_value={'status': 'completed'})
|
||||
file_obj = SimpleNamespace(uuid='file-uuid', file_name='test.pdf', extension='pdf')
|
||||
task_context = Mock()
|
||||
|
||||
await kb._store_file_task(file_obj, task_context)
|
||||
|
||||
task_context.set_current_action.assert_called_once_with('Processing file')
|
||||
kb.ap.storage_mgr.storage_provider.size.assert_awaited_once_with('test.pdf')
|
||||
kb._ingest_document.assert_awaited_once()
|
||||
assert kb.ap.persistence_mgr.execute_async.await_count == 2
|
||||
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('test.pdf')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_transitions_to_failed_on_error(self):
|
||||
"""Test that file status transitions to failed on exception."""
|
||||
STATUS_PENDING = 'pending'
|
||||
STATUS_PROCESSING = 'processing'
|
||||
STATUS_FAILED = 'failed'
|
||||
async def test_store_file_task_marks_failed_and_cleans_storage(self):
|
||||
kb = _make_kb()
|
||||
kb._ingest_document = AsyncMock(return_value={'status': 'failed', 'error_message': 'parser failed'})
|
||||
file_obj = SimpleNamespace(uuid='file-uuid', file_name='bad.pdf', extension='pdf')
|
||||
task_context = Mock()
|
||||
|
||||
# Simulate error scenario
|
||||
initial_status = STATUS_PENDING
|
||||
after_error = STATUS_FAILED
|
||||
with pytest.raises(Exception, match='parser failed'):
|
||||
await kb._store_file_task(file_obj, task_context)
|
||||
|
||||
assert initial_status == 'pending'
|
||||
assert after_error == 'failed'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_failed_status_preserves_error_info(self):
|
||||
"""Test that failed status includes error information for debugging."""
|
||||
# File record should have error field populated on failure
|
||||
mock_file_record = Mock()
|
||||
mock_file_record.status = 'failed'
|
||||
mock_file_record.error = 'ParserError: invalid format'
|
||||
|
||||
assert mock_file_record.status == 'failed'
|
||||
assert 'ParserError' in mock_file_record.error
|
||||
|
||||
|
||||
class TestMimeTypeDetection:
|
||||
"""Tests for MIME type detection in file storage."""
|
||||
|
||||
def test_pdf_mime_type(self):
|
||||
"""Test PDF MIME type detection."""
|
||||
filename = 'document.pdf'
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
expected_mime = 'application/pdf'
|
||||
assert ext == '.pdf'
|
||||
|
||||
def test_text_mime_type(self):
|
||||
"""Test text MIME type detection."""
|
||||
filename = 'notes.txt'
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
expected_mime = 'text/plain'
|
||||
assert ext == '.txt'
|
||||
|
||||
def test_markdown_mime_type(self):
|
||||
"""Test markdown MIME type detection."""
|
||||
filename = 'readme.md'
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
expected_mime = 'text/markdown'
|
||||
assert ext == '.md'
|
||||
|
||||
def test_doc_mime_type(self):
|
||||
"""Test DOC MIME type detection."""
|
||||
filename = 'report.doc'
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
expected_mime = 'application/msword'
|
||||
assert ext == '.doc'
|
||||
|
||||
def test_docx_mime_type(self):
|
||||
"""Test DOCX MIME type detection."""
|
||||
filename = 'report.docx'
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
expected_mime = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
|
||||
assert ext == '.docx'
|
||||
|
||||
|
||||
class TestStoreFileTaskCleanup:
|
||||
"""Tests for cleanup behavior in _store_file_task."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_storage_on_success(self):
|
||||
"""Test that storage is cleaned up after successful processing."""
|
||||
mock_storage_provider = Mock()
|
||||
mock_storage_provider.delete = AsyncMock()
|
||||
|
||||
storage_path = 'kb/test/file.pdf'
|
||||
should_cleanup = True # Based on source code finally block
|
||||
|
||||
if should_cleanup:
|
||||
await mock_storage_provider.delete(storage_path)
|
||||
|
||||
mock_storage_provider.delete.assert_called_once_with(storage_path)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_storage_on_failure(self):
|
||||
"""Test that storage is cleaned up even when processing fails."""
|
||||
mock_storage_provider = Mock()
|
||||
mock_storage_provider.delete = AsyncMock()
|
||||
|
||||
storage_path = 'kb/test/file.pdf'
|
||||
|
||||
# Simulate processing failure and cleanup
|
||||
try:
|
||||
raise Exception("Processing failed")
|
||||
except Exception:
|
||||
pass # Error handled
|
||||
|
||||
# Cleanup should still happen in finally block
|
||||
await mock_storage_provider.delete(storage_path)
|
||||
mock_storage_provider.delete.assert_called_once()
|
||||
assert kb.ap.persistence_mgr.execute_async.await_count == 2
|
||||
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('bad.pdf')
|
||||
|
||||
|
||||
class TestDeleteDocument:
|
||||
"""Tests for _delete_document method."""
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_document_returns_false_when_no_plugin_id(self):
|
||||
kb = _make_kb(plugin_id=None)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_kb_with_plugin(self):
|
||||
"""Create mock KB with plugin ID."""
|
||||
kbmgr = get_kbmgr_module()
|
||||
result = await kb._delete_document('doc-id')
|
||||
|
||||
mock_app = Mock()
|
||||
mock_app.logger = Mock()
|
||||
mock_app.plugin_connector = Mock()
|
||||
mock_app.plugin_connector.rag_delete_document = AsyncMock(return_value={'success': True})
|
||||
|
||||
mock_kb_entity = Mock()
|
||||
mock_kb_entity.uuid = 'test-kb-uuid'
|
||||
mock_kb_entity.knowledge_engine_plugin_id = 'author/engine'
|
||||
|
||||
kb = kbmgr.RuntimeKnowledgeBase(mock_app, mock_kb_entity)
|
||||
return kb
|
||||
|
||||
@pytest.fixture
|
||||
def mock_kb_without_plugin(self):
|
||||
"""Create mock KB without plugin ID."""
|
||||
kbmgr = get_kbmgr_module()
|
||||
|
||||
mock_app = Mock()
|
||||
mock_app.logger = Mock()
|
||||
|
||||
mock_kb_entity = Mock()
|
||||
mock_kb_entity.uuid = 'test-kb-uuid'
|
||||
mock_kb_entity.knowledge_engine_plugin_id = None
|
||||
|
||||
kb = kbmgr.RuntimeKnowledgeBase(mock_app, mock_kb_entity)
|
||||
return kb
|
||||
assert result is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_false_when_no_plugin_id(self, mock_kb_without_plugin):
|
||||
"""Test that _delete_document returns False when no plugin ID."""
|
||||
kb_entity = mock_kb_without_plugin.knowledge_base_entity
|
||||
async def test_delete_document_calls_configured_rag_plugin(self):
|
||||
kb = _make_kb()
|
||||
kb.ap.plugin_connector.call_rag_delete_document = AsyncMock(return_value=True)
|
||||
|
||||
if kb_entity.knowledge_engine_plugin_id is None:
|
||||
# Source code returns False early
|
||||
expected_result = False
|
||||
assert expected_result is False
|
||||
result = await kb._delete_document('doc-id')
|
||||
|
||||
assert result is True
|
||||
kb.ap.plugin_connector.call_rag_delete_document.assert_awaited_once_with(
|
||||
'author/engine', 'doc-id', 'test-kb-uuid'
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_true_on_success(self, mock_kb_with_plugin):
|
||||
"""Test that _delete_document returns True on successful delete."""
|
||||
kb_entity = mock_kb_with_plugin.knowledge_base_entity
|
||||
plugin_id = kb_entity.knowledge_engine_plugin_id
|
||||
async def test_delete_document_returns_false_on_plugin_error(self):
|
||||
kb = _make_kb()
|
||||
kb.ap.plugin_connector.call_rag_delete_document = AsyncMock(side_effect=Exception('plugin error'))
|
||||
|
||||
if plugin_id is not None:
|
||||
# Simulate successful plugin call
|
||||
mock_kb_with_plugin.ap.plugin_connector.rag_delete_document = AsyncMock(
|
||||
return_value={'success': True}
|
||||
)
|
||||
result = await mock_kb_with_plugin.ap.plugin_connector.rag_delete_document(
|
||||
plugin_id.split('/'), 'test-doc-id', kb_entity.uuid
|
||||
)
|
||||
assert result.get('success') is True
|
||||
result = await kb._delete_document('doc-id')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_false_on_plugin_error(self, mock_kb_with_plugin):
|
||||
"""Test that _delete_document returns False on plugin error."""
|
||||
kb_entity = mock_kb_with_plugin.knowledge_base_entity
|
||||
plugin_id = kb_entity.knowledge_engine_plugin_id
|
||||
|
||||
if plugin_id is not None:
|
||||
# Simulate plugin error
|
||||
mock_kb_with_plugin.ap.plugin_connector.rag_delete_document = AsyncMock(
|
||||
side_effect=Exception("Plugin error")
|
||||
)
|
||||
try:
|
||||
await mock_kb_with_plugin.ap.plugin_connector.rag_delete_document(
|
||||
plugin_id.split('/'), 'test-doc-id', kb_entity.uuid
|
||||
)
|
||||
result = True
|
||||
except Exception:
|
||||
result = False # Source code catches and returns False
|
||||
|
||||
assert result is False
|
||||
assert result is False
|
||||
kb.ap.logger.error.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user