Files
LangBot/tests/unit_tests/rag/test_file_storage.py
2026-05-16 10:30:17 +08:00

191 lines
7.4 KiB
Python

"""Unit tests for RuntimeKnowledgeBase file storage behavior."""
from __future__ import annotations
import io
import zipfile
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.rag.knowledge.kbmgr import RuntimeKnowledgeBase
def _make_zip_bytes(entries: dict[str, bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w') as zf:
for name, content in entries.items():
zf.writestr(name, content)
zf.mkdir('emptydir')
return buffer.getvalue()
def _make_app() -> Mock:
app = Mock()
app.logger = Mock()
app.task_mgr = Mock()
app.storage_mgr = Mock()
app.storage_mgr.storage_provider = Mock()
app.storage_mgr.storage_provider.exists = AsyncMock(return_value=True)
app.storage_mgr.storage_provider.load = AsyncMock()
app.storage_mgr.storage_provider.save = AsyncMock()
app.storage_mgr.storage_provider.size = AsyncMock(return_value=123)
app.storage_mgr.storage_provider.delete = AsyncMock()
app.persistence_mgr = Mock()
app.persistence_mgr.execute_async = AsyncMock()
app.plugin_connector = Mock()
return app
def _make_kb(plugin_id: str | None = 'author/engine') -> RuntimeKnowledgeBase:
kb_entity = Mock()
kb_entity.uuid = 'test-kb-uuid'
kb_entity.collection_id = 'test-collection'
kb_entity.creation_settings = {}
kb_entity.knowledge_engine_plugin_id = plugin_id
return RuntimeKnowledgeBase(_make_app(), kb_entity)
class TestStoreFile:
@pytest.mark.asyncio
async def test_store_file_creates_pending_record_and_user_task(self):
kb = _make_kb()
def create_user_task(coro, **kwargs):
coro.close()
return SimpleNamespace(id='task-1', kwargs=kwargs)
kb.ap.task_mgr.create_user_task = Mock(side_effect=create_user_task)
task_id = await kb.store_file('documents/test.pdf')
assert task_id == 'task-1'
kb.ap.storage_mgr.storage_provider.exists.assert_awaited_once_with('documents/test.pdf')
kb.ap.persistence_mgr.execute_async.assert_awaited_once()
call_kwargs = kb.ap.task_mgr.create_user_task.call_args.kwargs
assert call_kwargs['kind'] == 'knowledge-operation'
assert call_kwargs['name'] == 'knowledge-store-file-documents/test.pdf'
assert call_kwargs['label'] == 'Store file documents/test.pdf'
@pytest.mark.asyncio
async def test_store_file_raises_when_source_file_missing(self):
kb = _make_kb()
kb.ap.storage_mgr.storage_provider.exists = AsyncMock(return_value=False)
with pytest.raises(Exception, match='File missing.pdf not found'):
await kb.store_file('missing.pdf')
kb.ap.persistence_mgr.execute_async.assert_not_awaited()
kb.ap.task_mgr.create_user_task.assert_not_called()
class TestStoreZipFile:
@pytest.mark.asyncio
async def test_store_zip_file_extracts_supported_files_and_skips_noise(self):
kb = _make_kb()
kb.ap.storage_mgr.storage_provider.load = AsyncMock(
return_value=_make_zip_bytes(
{
'doc1.pdf': b'pdf',
'doc2.txt': b'text',
'subdir/doc3.md': b'markdown',
'page.html': b'html',
'image.png': b'png',
'.hidden': b'hidden',
'__MACOSX/doc1.pdf': b'metadata',
}
)
)
kb.store_file = AsyncMock(side_effect=['task-pdf', 'task-txt', 'task-md', 'task-html'])
task_id = await kb._store_zip_file('archive.zip', parser_plugin_id='parser/plugin')
assert task_id == 'task-pdf'
assert kb.ap.storage_mgr.storage_provider.save.await_count == 4
saved_names = [call.args[0] for call in kb.ap.storage_mgr.storage_provider.save.await_args_list]
assert any(name.startswith('doc1_') and name.endswith('.pdf') for name in saved_names)
assert any(name.startswith('doc2_') and name.endswith('.txt') for name in saved_names)
assert any(name.startswith('subdir_doc3_') and name.endswith('.md') for name in saved_names)
assert any(name.startswith('page_') and name.endswith('.html') for name in saved_names)
assert not any('image' in name for name in saved_names)
assert not any('hidden' in name for name in saved_names)
assert not any('__MACOSX' in name for name in saved_names)
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('archive.zip')
@pytest.mark.asyncio
async def test_store_zip_file_raises_when_no_supported_files(self):
kb = _make_kb()
kb.ap.storage_mgr.storage_provider.load = AsyncMock(
return_value=_make_zip_bytes({'image.png': b'png', 'video.mp4': b'video'})
)
kb.store_file = AsyncMock()
with pytest.raises(Exception, match='No supported files found'):
await kb._store_zip_file('archive.zip')
kb.store_file.assert_not_awaited()
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('archive.zip')
class TestStoreFileTask:
@pytest.mark.asyncio
async def test_store_file_task_marks_completed_and_cleans_storage(self):
kb = _make_kb()
kb._ingest_document = AsyncMock(return_value={'status': 'completed'})
file_obj = SimpleNamespace(uuid='file-uuid', file_name='test.pdf', extension='pdf')
task_context = Mock()
await kb._store_file_task(file_obj, task_context)
task_context.set_current_action.assert_called_once_with('Processing file')
kb.ap.storage_mgr.storage_provider.size.assert_awaited_once_with('test.pdf')
kb._ingest_document.assert_awaited_once()
assert kb.ap.persistence_mgr.execute_async.await_count == 2
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('test.pdf')
@pytest.mark.asyncio
async def test_store_file_task_marks_failed_and_cleans_storage(self):
kb = _make_kb()
kb._ingest_document = AsyncMock(return_value={'status': 'failed', 'error_message': 'parser failed'})
file_obj = SimpleNamespace(uuid='file-uuid', file_name='bad.pdf', extension='pdf')
task_context = Mock()
with pytest.raises(Exception, match='parser failed'):
await kb._store_file_task(file_obj, task_context)
assert kb.ap.persistence_mgr.execute_async.await_count == 2
kb.ap.storage_mgr.storage_provider.delete.assert_awaited_once_with('bad.pdf')
class TestDeleteDocument:
@pytest.mark.asyncio
async def test_delete_document_returns_false_when_no_plugin_id(self):
kb = _make_kb(plugin_id=None)
result = await kb._delete_document('doc-id')
assert result is False
@pytest.mark.asyncio
async def test_delete_document_calls_configured_rag_plugin(self):
kb = _make_kb()
kb.ap.plugin_connector.call_rag_delete_document = AsyncMock(return_value=True)
result = await kb._delete_document('doc-id')
assert result is True
kb.ap.plugin_connector.call_rag_delete_document.assert_awaited_once_with(
'author/engine', 'doc-id', 'test-kb-uuid'
)
@pytest.mark.asyncio
async def test_delete_document_returns_false_on_plugin_error(self):
kb = _make_kb()
kb.ap.plugin_connector.call_rag_delete_document = AsyncMock(side_effect=Exception('plugin error'))
result = await kb._delete_document('doc-id')
assert result is False
kb.ap.logger.error.assert_called_once()