mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 12:05:54 +00:00
* fix(ci): update unit-test workflow paths to match current source layout Replace stale pkg/** filter with src/langbot/** and add uv.lock. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(tests): update README to reflect current test layout - Fix stale paths: tests/pipeline → tests/unit_tests/pipeline - Update CI Python versions: 3.11, 3.12, 3.13 - Add test directory structure for box, config, platform, plugin, provider, storage - Document pytest markers and uv commands - Mention planned E2E tests Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add shared test factories package Create tests/factories/ with reusable test factories: - FakeApp: mock application with all dependencies - Message chains: text_chain, mention_chain, image_chain - Query factories: text_query, group_text_query, command_query, etc. No test changes - maintains backward compatibility. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add fake provider factory Add tests/factories/provider.py with: - FakeProvider: deterministic fake LLM provider - Error simulation: timeout, auth, rate-limit, malformed - Request capture for assertions - fake_model: mock model with attached provider Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add fake platform factory Add tests/factories/platform.py with: - FakePlatform: simulated platform adapter - Inbound message construction: friend/group/image - Mention-bot flag simulation - Outbound message capture for assertions - Streaming output support simulation - Send failure simulation Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add comprehensive message/query factories Extend tests/factories/message.py with: - file_query: file attachment query - unsupported_query: unknown message segment - voice_query: audio/voice query - at_all_query: group @All mention - query_with_session: query with session object - query_with_config: query with custom pipeline config Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add fake message flow smoke test Create tests/smoke/test_fake_message_flow.py: - TestFakeMessageFlow: factory verification tests - TestMessageFlowIntegration: minimal flow smoke test - Tests FakeApp, FakeProvider, FakePlatform, query factories - Verifies LANGBOT_FAKE_PONG marker response - Captures outbound messages for assertions Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add developer test-quick command Add scripts/test-quick.sh and Makefile with: - test-quick: runs ruff check + unit tests + smoke tests - No real provider keys or platform accounts required - Suitable for local branch self-test Update tests/README.md: - Document test-quick command - Document test factories package - Add smoke tests and factories directory structure Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(test): make test-quick reliable as developer gate Fixes for D-001验收问题: 1. test-quick.sh: use set -euo pipefail, uv run ruff, no tail pipe 2. Remove unused imports in factories (app.py, platform.py, provider.py) 3. Fix unused variable in smoke test 4. Add noqa: E402 to test_n8nsvapi.py lazy imports 5. Update smoke test docs: "minimal fake flow" not full pipeline Now test-quick is a reliable gate: lint failures exit 1, test failures propagate. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(unit): add preproc and taskmgr unit tests U-001: Pipeline Preprocessor tests - Normal text message processing - Empty message handling - Image segment with/without vision model - Model selection and fallback - Variable extraction U-004: Core Task Manager tests (pattern-based) - Task creation and tracking patterns - Task cancellation patterns - Scope-based cancellation - Task type filtering - Pruning completed tasks - Wait all tasks Taskmgr tests use pattern-based approach to avoid circular import in source code (taskmgr → app → http_controller → migration → taskmgr). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(unit): add config loader unit tests U-005: Config Loader tests - Valid YAML config loading - Valid JSON config loading - Invalid YAML/JSON error behavior - Missing config file creation from template - Template completion for missing keys - ConfigManager load/dump operations - Exists check for both YAML and JSON All tests use tmp_path fixture, no real project config. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(unit): add chat and command handler pattern tests U-002: Chat Handler tests (pattern-based) - Normal message event emission pattern - prevent_default handling - User message alteration pattern - Runner selection pattern - Streaming/non-streaming response patterns - Exception handling modes (show-error, show-hint, hide) - Message history update pattern - Telemetry payload pattern U-003: Command Handler tests (pattern-based) - Command parsing and text extraction - Event creation pattern - Privilege/admin check pattern - Command result handling (text, error, image) - prevent_default handling - String truncation helper Uses pattern-based testing to avoid circular import issues in source code. Direct imports of handler modules trigger circular import chain. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * style: fix unused imports after ruff auto-fix Remove unused imports in test files: - test_config_loader.py: remove unused os - test_taskmgr.py: remove unused Mock - test_preproc.py: remove unused unsupported_query, image_chain Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(unit): improve taskmgr tests to test real classes U-004 improved: Tests now import and test actual classes: - TaskContext: new(), trace(), to_dict(), placeholder() - TaskWrapper: task creation, context, exception/result capture, cancel, to_dict - AsyncTaskManager: create_task, create_user_task, cancel_task, cancel_by_scope - Task pruning behavior Uses pre-mocking technique: - Mock langbot.pkg.core.app before import (breaks circular chain) - Mock langbot.pkg.core.entities with proper Enum All 24 tests now test real class behavior, not patterns. taskmgr.py coverage should improve significantly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * refactor(test): consolidate FakeApp and add sys.modules isolation utility - Extract tests/utils/import_isolation.py with isolated_sys_modules context manager - Extend tests/factories/app.py FakeApp with handler-specific attributes - Refactor test_chat_handler.py to use centralized FakeApp and cached imports - Refactor test_command_handler.py with mock_execute_factory fixture - Refactor test_smoke.py to move import-time sys.modules manipulation into fixture - Add SQLite migration integration tests (G-002) - Add HTTP API smoke integration tests (G-005) - Update CI workflow to call pytest for SQLite migrations (G-004) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add developer quality gate consolidation (G-007) - Add scripts/test-integration-fast.sh for fast integration tests - Add scripts/test-coverage.sh with 12% baseline threshold - Update Makefile with test-integration-fast, test-coverage, test-all-local - Update CI workflow with integration and coverage jobs - Add smoke marker to pytest.ini - Update tests/README.md with quality gate layers documentation - Add tests/integration/pipeline/ for pipeline stage-chain tests Quality gate layers: - Quick: ruff + unit + smoke (~2 min) - Fast Integration: SQLite/API/Pipeline (~3 min) - Coverage: 12% threshold gate (~8 min) - Full Local: all three combined Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): add PostgreSQL migration slow integration tests (G-003) - Add tests/integration/persistence/test_migrations_postgres.py - All tests marked with @pytest.mark.slow - Tests skip when TEST_POSTGRES_URL is not set (no local PostgreSQL) - Database isolation via clean_tables and clean_alembic_version fixtures - Update CI workflow to use pytest instead of inline Python script - Remove TODO(G-003) comment - Update tests/README.md with PostgreSQL test documentation Covered scenarios: - Baseline stamp sets revision - Upgrade from baseline to head - Upgrade idempotent - Get current on unstamped DB returns None Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(test): Phase 1.5 coverage expansion - COV-001 to COV-013 Coverage baseline raised from 13.65% to 26% (+12.35%) Gate raised from 12% to 18% Tasks completed: - COV-001: Command system unit tests (100% coverage) - COV-002: API service unit tests batch 1 (user/apikey/model/provider) - COV-003: Provider model manager unit tests - COV-004: Pipeline remaining stage tests (aggregator/cntfilter/longtext/msgtrun) - COV-005: Storage and utils coverage pass - COV-006: Gate ratchet 12%→15% - COV-007: Gate ratchet 15%→18% - COV-008: API service batch 2 (bot/pipeline/webhook/space/maintenance/mcp) - COV-009: Blocked - API controller circular import issue documented - COV-010: Plugin runtime unit tests (+0.08%) - COV-011: RAG and vector unit tests (+0.68%) - COV-012: Core boot and migration unit tests - COV-013: Provider requester logic unit tests (+0.62%) Key additions: - tests/utils/import_isolation.py: sys.modules isolation for circular imports - Provider requester mock tests: proved HTTP-dependent code can be tested locally - Vector filter utilities: 100% coverage on pure functions - API services: fake persistence pattern for unit testing Blocked issue COV-009 documented in langbot-test-plan/1.5/issues/ Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(phase1): add unit tests for telemetry, plugin, rag, persistence Add initial unit tests for Phase 1 of test coverage improvement: - telemetry: test initialization, payload sanitization, early returns (14.3% → 62.9%) - plugin: test _parse_plugin_id static method - rag: test _to_i18n_name static method - persistence: test serialize_model with datetime handling Overall core coverage: 41.9% → 42.2% Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(phase2): add unit tests for core, persistence, plugin, utils - Add test_handler_helpers.py for plugin handler helpers (7 tests) - Add test_mgr_methods.py for persistence manager (5 tests) - Add test_app_config_validation.py for core app config (12 tests) - Add test_knowledge_service.py for API knowledge service (22 tests) - Add test_kbmgr.py for RAG knowledge base manager (39 tests) - Add test_survey_manager.py for survey manager (22 tests) - Add test_connector_methods.py for plugin connector (24 tests) - Add test_funcschema.py for utils function schema (9 tests) - Add test_platform.py for utils platform detection (7 tests) - Add test_extract_deps.py for plugin deps extraction (7 tests) - Add test_database_decorator.py for persistence decorator (7 tests) - Add test_load_config.py for core config loading (19 tests) - Add COVERAGE_EXCLUSIONS.md documenting external adapter exclusions - Fix test_chat_session_limit.py path for portability Coverage: core 28% → 30%, persistence 24% → 24.4%, plugin 27% → 28% Total: 1082 tests passed, core module coverage 45.5% Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(integration): add API controller integration tests - Add test_pipelines.py (10 tests) covering pipelines CRUD operations - GET/POST/PUT/DELETE on /api/v1/pipelines - Extensions endpoint - Metadata endpoint - Coverage: pipelines controller 27% → 80% - Add test_providers.py (10 tests) covering provider/model management - Provider CRUD with model counts - LLM model CRUD - Coverage: providers controller 23% → 81%, models 29% → 45% Tests use Quart TestClient with mocked services for real HTTP behavior without external dependencies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(integration): add knowledge, bots, and model endpoints tests - Add test_knowledge.py (10 tests) covering knowledge base management - CRUD operations on /api/v1/knowledge/bases - Files management endpoints - Retrieve endpoint with validation - Coverage: knowledge/base.py 26% → 91% - Add test_bots.py (9 tests) covering bot management - CRUD operations on /api/v1/platform/bots - Logs endpoint - Send message endpoint with validation - Coverage: platform/bots.py 24% → 87% - Extend test_providers.py (+4 tests) for embedding/rerank models - Embedding models CRUD - Rerank models CRUD - Coverage: provider/models.py 29% → 60% Total integration tests: 53 (smoke 12 + pipelines 10 + providers 14 + knowledge 10 + bots 9) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(integration): add embed and monitoring endpoint tests Add integration tests for embed widget and monitoring API endpoints: - test_embed.py: 15 tests for widget.js, logo, turnstile, messages, reset, feedback - test_monitoring.py: 15 tests for overview, messages, llm-calls, sessions, errors, export Coverage improvements: - embed.py: 17% → 56% - monitoring.py: 17% → 93% Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(e2e): add minimal startup E2E tests Add E2E tests for LangBot startup flow: - tests/e2e/utils/config_factory.py: minimal config generation - tests/e2e/utils/process_manager.py: LangBot subprocess management - tests/e2e/conftest.py: E2E fixtures (session-scoped process) - tests/e2e/test_startup.py: 12 tests for startup verification Tests verify: - boot.py + stages execution - database initialization (SQLite) - API availability - migrations applied Uses embedded databases (SQLite, Chroma) - no external dependencies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test(quality): fix fake tests and add missing coverage P0 fixes: - telemetry: rewrite fake tests with real behavior verification (25 tests) - config: delete copied-source tests, use proper imports (2 deleted) - persistence: fix try-except pass to verify specific errors P1 fixes: - pipeline: add real FixedWindowAlgo tests instead of mocks (12 tests) - provider: add SessionManager and ToolManager tests (25 tests) - storage: add S3StorageProvider tests with moto mock (16 tests) - plugin: add handler action tests for setting inheritance (15 tests) - rag: add file storage and ZIP processing tests (21 tests) - vector: add VDB filter conversion tests (30 tests) P2 fixes: - pipeline/msgtrun: strengthen assertions for exact message count - api: add response structure validation in integration tests New test files: - provider/test_session_manager.py - provider/test_tool_manager.py - storage/test_s3storage.py - plugin/test_handler_actions.py - rag/test_file_storage.py - vector/test_vdb_filter_conversion.py Source code bugs documented: - provider: TokenManager.next_token() ZeroDivisionError - telemetry: send_tasks class variable shared state - command: empty command IndexError, unused parameters - utils: funcschema KeyError - entity: vector.py independent declarative_base Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(test): update coverage stats and test structure - Update coverage from 22% to 30% - Add new test files to structure: - provider: session_manager, tool_manager - storage: s3storage - plugin: handler_actions - rag: file_storage - vector: vdb_filter_conversion - telemetry: rewritten tests - Update module coverage percentages Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * test: add 105 new unit tests for untested core functionality Add comprehensive tests for B-class issues (core functionality untested): Pipeline: - test_pool.py: QueryPool ID generation, caching, async context (12 tests) - test_ratelimit.py: Fixed timing-sensitive test tolerance - test_pipelinemgr.py: Use real Pydantic StageProcessResult instead of Mock Utils: - test_version.py: Version comparison functions (20 tests) - test_logcache.py: Log page management and retrieval (18 tests) - test_httpclient.py: HTTP session pool management (10 tests) - test_proxy.py: Proxy configuration from env and config (10 tests) - test_image.py: URL parsing and base64 extraction (12 tests) - test_pkgmgr.py: Pip command generation (8 tests) Discover: - test_engine.py: I18nString, Metadata, Component manifest (15 tests) Test count: 1193 → 1298 (+105 tests) Note: Some B-class issues cannot be tested due to circular import bugs filed as GitHub issues #2175 (pipeline) and #2176 (persistence). * test: tighten phase 1 coverage contracts * test: align ci integration isolation --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
638 lines
19 KiB
Python
638 lines
19 KiB
Python
"""
|
|
Unit tests for MessageAggregator (aggregator) module.
|
|
|
|
Tests cover:
|
|
- Message buffering and merging
|
|
- Timer-based flush behavior
|
|
- MAX_BUFFER_MESSAGES limit
|
|
- Aggregation enabled/disabled
|
|
- Config delay clamping
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, AsyncMock
|
|
from importlib import import_module
|
|
|
|
from tests.factories import (
|
|
FakeApp,
|
|
text_chain,
|
|
friend_message_event,
|
|
mock_adapter,
|
|
)
|
|
|
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
|
|
|
|
|
def get_aggregator_module():
|
|
"""Lazy import to avoid circular import issues."""
|
|
return import_module('langbot.pkg.pipeline.aggregator')
|
|
|
|
|
|
def make_aggregator_app():
|
|
"""Create a FakeApp with necessary mocks for aggregator tests."""
|
|
app = FakeApp()
|
|
# Ensure query_pool has add_query method
|
|
app.query_pool.add_query = AsyncMock()
|
|
# Add pipeline_mgr mock
|
|
app.pipeline_mgr = AsyncMock()
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=None)
|
|
return app
|
|
|
|
|
|
class TestPendingMessage:
|
|
"""Tests for PendingMessage dataclass."""
|
|
|
|
def test_pending_message_creation(self):
|
|
"""PendingMessage should be created with correct fields."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
pending = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid='test-pipeline',
|
|
)
|
|
|
|
assert pending.bot_uuid == 'test-bot'
|
|
assert pending.launcher_type == provider_session.LauncherTypes.PERSON
|
|
assert pending.message_chain == chain
|
|
assert pending.timestamp is not None
|
|
|
|
|
|
class TestSessionBuffer:
|
|
"""Tests for SessionBuffer dataclass."""
|
|
|
|
def test_session_buffer_creation(self):
|
|
"""SessionBuffer should be created with correct fields."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
buffer = aggregator.SessionBuffer(session_id='test-session')
|
|
|
|
assert buffer.session_id == 'test-session'
|
|
assert buffer.messages == []
|
|
assert buffer.timer_task is None
|
|
assert buffer.last_message_time is not None
|
|
|
|
def test_session_buffer_with_messages(self):
|
|
"""SessionBuffer should accept initial messages."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
pending = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
buffer = aggregator.SessionBuffer(
|
|
session_id='test-session',
|
|
messages=[pending],
|
|
)
|
|
|
|
assert len(buffer.messages) == 1
|
|
|
|
|
|
class TestMessageAggregatorInit:
|
|
"""Tests for MessageAggregator initialization."""
|
|
|
|
def test_aggregator_init(self):
|
|
"""MessageAggregator should initialize with correct fields."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
assert agg.ap == app
|
|
assert agg.buffers == {}
|
|
assert isinstance(agg.lock, asyncio.Lock)
|
|
|
|
|
|
class TestMessageAggregatorSessionId:
|
|
"""Tests for session ID generation."""
|
|
|
|
def test_session_id_format(self):
|
|
"""Session ID should be correctly formatted."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
session_id = agg._get_session_id(
|
|
bot_uuid='bot-123',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=45678,
|
|
)
|
|
|
|
assert session_id == 'bot-123:person:45678'
|
|
|
|
def test_session_id_different_launchers(self):
|
|
"""Different launcher types should produce different IDs."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
person_id = agg._get_session_id(
|
|
bot_uuid='bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=123,
|
|
)
|
|
|
|
group_id = agg._get_session_id(
|
|
bot_uuid='bot',
|
|
launcher_type=provider_session.LauncherTypes.GROUP,
|
|
launcher_id=123,
|
|
)
|
|
|
|
assert person_id != group_id
|
|
|
|
|
|
class TestMessageAggregatorConfig:
|
|
"""Tests for aggregation config retrieval."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_none_pipeline(self):
|
|
"""None pipeline_uuid should return default config."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config(None)
|
|
|
|
assert enabled == False
|
|
assert delay == 1.5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_pipeline_not_found(self):
|
|
"""Non-existent pipeline should return default config."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=None)
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config('unknown-pipeline')
|
|
|
|
assert enabled == False
|
|
assert delay == 1.5
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_enabled(self):
|
|
"""Pipeline with enabled aggregation should return True."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 2.0,
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config('test-pipeline')
|
|
|
|
assert enabled == True
|
|
assert delay == 2.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_delay_clamped_low(self):
|
|
"""Delay below 1.0 should be clamped to 1.0."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 0.5, # Below minimum
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config('test-pipeline')
|
|
|
|
assert delay == 1.0 # Clamped to minimum
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_delay_clamped_high(self):
|
|
"""Delay above 10.0 should be clamped to 10.0."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 15.0, # Above maximum
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config('test-pipeline')
|
|
|
|
assert delay == 10.0 # Clamped to maximum
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_config_delay_invalid_type(self):
|
|
"""Invalid delay type should use default."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 'invalid', # Not a number
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
enabled, delay = await agg._get_aggregation_config('test-pipeline')
|
|
|
|
assert delay == 1.5 # Default
|
|
|
|
|
|
class TestMessageAggregatorAddMessage:
|
|
"""Tests for add_message behavior."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disabled_adds_to_query_pool(self):
|
|
"""Disabled aggregation should directly add to query_pool."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
await agg.add_message(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None, # None -> disabled
|
|
)
|
|
|
|
# Should have called query_pool.add_query
|
|
assert app.query_pool.add_query.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_enabled_buffers_message(self):
|
|
"""Enabled aggregation should buffer message."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 2.0,
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
await agg.add_message(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid='test-pipeline',
|
|
)
|
|
|
|
# Should have buffered the message
|
|
assert len(agg.buffers) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_buffer_flushes_immediately(self):
|
|
"""Reaching MAX_BUFFER_MESSAGES should flush immediately."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
|
|
mock_pipeline = Mock()
|
|
mock_pipeline.pipeline_entity = Mock()
|
|
mock_pipeline.pipeline_entity.config = {
|
|
'trigger': {
|
|
'message-aggregation': {
|
|
'enabled': True,
|
|
'delay': 10.0, # Long delay
|
|
}
|
|
}
|
|
}
|
|
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock(return_value=mock_pipeline)
|
|
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
# Add messages up to MAX_BUFFER_MESSAGES
|
|
for i in range(aggregator.MAX_BUFFER_MESSAGES):
|
|
await agg.add_message(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid='test-pipeline',
|
|
)
|
|
|
|
# Buffer should be flushed (empty or no buffer)
|
|
session_id = agg._get_session_id('test-bot', provider_session.LauncherTypes.PERSON, 12345)
|
|
assert session_id not in agg.buffers or len(agg.buffers[session_id].messages) == 0
|
|
|
|
|
|
class TestMessageAggregatorMerge:
|
|
"""Tests for message merging."""
|
|
|
|
def test_merge_single_message(self):
|
|
"""Single message should return unchanged."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
pending = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
merged = agg._merge_messages([pending])
|
|
|
|
assert merged.message_chain == chain
|
|
|
|
def test_merge_multiple_messages(self):
|
|
"""Multiple messages should be merged with newline separator."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain1 = text_chain("hello")
|
|
chain2 = text_chain("world")
|
|
event = friend_message_event(chain1)
|
|
adapter = mock_adapter()
|
|
|
|
pending1 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain1,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
pending2 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain2,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
merged = agg._merge_messages([pending1, pending2])
|
|
|
|
# Should contain both messages with separator
|
|
merged_str = str(merged.message_chain)
|
|
assert "hello" in merged_str
|
|
assert "world" in merged_str
|
|
|
|
def test_merge_messages_preserves_routed_by_rule_if_any_input_matches(self):
|
|
"""Merged PendingMessage should keep routed_by_rule when any input was rule-routed."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain1 = text_chain("first")
|
|
chain2 = text_chain("second")
|
|
event = friend_message_event(chain1)
|
|
adapter = mock_adapter()
|
|
|
|
pending1 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain1,
|
|
adapter=adapter,
|
|
pipeline_uuid='test-pipeline-uuid',
|
|
routed_by_rule=False,
|
|
)
|
|
|
|
pending2 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain2,
|
|
adapter=adapter,
|
|
pipeline_uuid='test-pipeline-uuid',
|
|
routed_by_rule=True,
|
|
)
|
|
|
|
merged = agg._merge_messages([pending1, pending2])
|
|
|
|
assert merged.routed_by_rule is True
|
|
assert str(merged.message_chain) == 'first\nsecond'
|
|
|
|
|
|
class TestMessageAggregatorFlush:
|
|
"""Tests for buffer flush behavior."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_empty_buffer(self):
|
|
"""Flushing empty buffer should do nothing."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
await agg._flush_buffer('nonexistent-session')
|
|
|
|
# Should not call query_pool
|
|
assert not app.query_pool.add_query.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_single_message(self):
|
|
"""Flushing single message should add directly to query_pool."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
pending = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
buffer = aggregator.SessionBuffer(
|
|
session_id='test-session',
|
|
messages=[pending],
|
|
)
|
|
|
|
agg.buffers['test-session'] = buffer
|
|
|
|
await agg._flush_buffer('test-session')
|
|
|
|
assert app.query_pool.add_query.called
|
|
assert 'test-session' not in agg.buffers
|
|
|
|
|
|
class TestMessageAggregatorFlushAll:
|
|
"""Tests for flush_all behavior."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_all_empty(self):
|
|
"""flush_all with no buffers should do nothing."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
await agg.flush_all()
|
|
|
|
# Should not call query_pool
|
|
assert not app.query_pool.add_query.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_all_with_buffers(self):
|
|
"""flush_all should flush all pending buffers."""
|
|
aggregator = get_aggregator_module()
|
|
|
|
app = make_aggregator_app()
|
|
agg = aggregator.MessageAggregator(app)
|
|
|
|
chain = text_chain("hello")
|
|
event = friend_message_event(chain)
|
|
adapter = mock_adapter()
|
|
|
|
# Create two buffers
|
|
pending1 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=12345,
|
|
sender_id=12345,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
pending2 = aggregator.PendingMessage(
|
|
bot_uuid='test-bot',
|
|
launcher_type=provider_session.LauncherTypes.PERSON,
|
|
launcher_id=67890,
|
|
sender_id=67890,
|
|
message_event=event,
|
|
message_chain=chain,
|
|
adapter=adapter,
|
|
pipeline_uuid=None,
|
|
)
|
|
|
|
buffer1 = aggregator.SessionBuffer(session_id='session-1', messages=[pending1])
|
|
buffer2 = aggregator.SessionBuffer(session_id='session-2', messages=[pending2])
|
|
|
|
agg.buffers['session-1'] = buffer1
|
|
agg.buffers['session-2'] = buffer2
|
|
|
|
await agg.flush_all()
|
|
|
|
# Both buffers should be flushed
|
|
assert len(agg.buffers) == 0
|
|
assert app.query_pool.add_query.call_count == 2
|