Files
LangBot/tests/factories/app.py
Junyan Qin d1ddff9cdb test: reconcile master's unit tests with feat/sandbox refactors
The merge from master brought in new unit tests that target pre-refactor
APIs on feat/sandbox. Reconcile each:

- factories/app.py: FakeApp now exposes a Mock skill_mgr (with empty .skills
  dict + inert prompt-addition builder) and a Mock pipeline_service so the
  PreProcessor skill-index injection branch can run end-to-end in tests.

- pipeline/conftest.py: eagerly import langbot.pkg.pipeline.pipelinemgr so
  pipeline.stage is fully initialised before any individual stage test
  (preproc, longtext, ...) tries to lazy-load it. Without this preload,
  running test_preproc.py in isolation hit a circular-import error via the
  stage -> app -> pipelinemgr -> stage chain.

- provider/test_tool_manager.py: ToolManager now probes four loaders
  (native -> plugin -> mcp -> skill). Inject inert native + skill mocks in
  the execute_func_call fixture and assert all four shutdowns fire.

- utils/test_paths.py: drop the three cwd-dependent _check_if_source_install
  cases. The refactor walks Path(__file__).resolve().parents looking for
  pyproject.toml + main.py, so cwd no longer factors in and there's no
  file read to mock-fail. The positive case and caching test still apply.

- utils/test_version.py: delete entirely. is_newer and compare_version_str
  were removed when VersionManager was refactored to use the Space API for
  release checks (1b4107a9); the tests targeted a surface that no longer
  exists.
2026-05-21 00:04:34 +08:00

154 lines
5.0 KiB
Python

"""
Fake application factory for tests.
Provides a mock Application object with all dependencies needed by pipeline stages.
"""
from __future__ import annotations
from unittest.mock import AsyncMock, Mock
class FakeApp:
"""Mock Application object providing all basic dependencies needed by stages."""
def __init__(
self,
*,
command_prefix: list[str] = ['/', '!'],
command_enable: bool = True,
pipeline_concurrency: int = 10,
admins: list[str] | None = None,
**extra_attrs,
):
self.logger = self._create_mock_logger()
self.sess_mgr = self._create_mock_session_manager()
self.model_mgr = self._create_mock_model_manager()
self.tool_mgr = self._create_mock_tool_manager()
self.plugin_connector = self._create_mock_plugin_connector()
self.persistence_mgr = self._create_mock_persistence_manager()
self.query_pool = self._create_mock_query_pool()
self.instance_config = self._create_mock_instance_config(
command_prefix=command_prefix,
command_enable=command_enable,
pipeline_concurrency=pipeline_concurrency,
admins=admins or [],
)
self.task_mgr = self._create_mock_task_manager()
# Handler-specific optional attributes
self.telemetry = self._create_mock_telemetry()
self.survey = None
self.cmd_mgr = self._create_mock_cmd_mgr()
self.skill_mgr = self._create_mock_skill_mgr()
self.pipeline_service = self._create_mock_pipeline_service()
# Apply any extra attributes for specific test scenarios
for name, value in extra_attrs.items():
setattr(self, name, value)
# Captured outbound messages (for assertions)
self._outbound_messages: list = []
def _create_mock_logger(self):
logger = Mock()
logger.debug = Mock()
logger.info = Mock()
logger.error = Mock()
logger.warning = Mock()
return logger
def _create_mock_session_manager(self):
sess_mgr = AsyncMock()
sess_mgr.get_session = AsyncMock()
sess_mgr.get_conversation = AsyncMock()
return sess_mgr
def _create_mock_model_manager(self):
model_mgr = AsyncMock()
model_mgr.get_model_by_uuid = AsyncMock()
return model_mgr
def _create_mock_tool_manager(self):
tool_mgr = AsyncMock()
tool_mgr.get_all_tools = AsyncMock(return_value=[])
return tool_mgr
def _create_mock_plugin_connector(self):
plugin_connector = AsyncMock()
plugin_connector.emit_event = AsyncMock()
return plugin_connector
def _create_mock_persistence_manager(self):
persistence_mgr = AsyncMock()
persistence_mgr.execute_async = AsyncMock()
return persistence_mgr
def _create_mock_query_pool(self):
query_pool = Mock()
query_pool.cached_queries = {}
query_pool.queries = []
query_pool.condition = AsyncMock()
return query_pool
def _create_mock_instance_config(
self,
command_prefix: list[str],
command_enable: bool,
pipeline_concurrency: int,
admins: list[str],
):
instance_config = Mock()
instance_config.data = {
'command': {'prefix': command_prefix, 'enable': command_enable},
'concurrency': {'pipeline': pipeline_concurrency},
'admins': admins,
}
return instance_config
def _create_mock_task_manager(self):
task_mgr = Mock()
task_mgr.create_task = Mock()
return task_mgr
def _create_mock_telemetry(self):
telemetry = AsyncMock()
telemetry.start_send_task = AsyncMock()
return telemetry
def _create_mock_cmd_mgr(self):
cmd_mgr = AsyncMock()
cmd_mgr.execute = AsyncMock()
return cmd_mgr
def _create_mock_skill_mgr(self):
"""Mock SkillManager that returns no skill index addition by default."""
skill_mgr = Mock()
skill_mgr.skills = {}
skill_mgr.build_skill_aware_prompt_addition = Mock(return_value='')
skill_mgr.get_skill_index = Mock(return_value=[])
return skill_mgr
def _create_mock_pipeline_service(self):
"""Mock PipelineService.get_pipeline returning empty extensions prefs."""
pipeline_service = AsyncMock()
pipeline_service.get_pipeline = AsyncMock(return_value={'extensions_preferences': {}})
return pipeline_service
def capture_message(self, message):
"""Capture an outbound message for test assertions."""
self._outbound_messages.append(message)
def get_outbound_messages(self) -> list:
"""Get all captured outbound messages."""
return self._outbound_messages.copy()
def clear_outbound_messages(self):
"""Clear captured outbound messages."""
self._outbound_messages.clear()
def fake_app(**kwargs) -> FakeApp:
"""Create a FakeApp instance with optional overrides."""
return FakeApp(**kwargs)