Files
LangBot/tests/integration/pipeline/test_full_flow.py
huanghuoguoguo 17bbc8bf10 Feat/test build (#2174)
* fix(ci): update unit-test workflow paths to match current source layout

Replace stale pkg/** filter with src/langbot/** and add uv.lock.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(tests): update README to reflect current test layout

- Fix stale paths: tests/pipeline → tests/unit_tests/pipeline
- Update CI Python versions: 3.11, 3.12, 3.13
- Add test directory structure for box, config, platform, plugin, provider, storage
- Document pytest markers and uv commands
- Mention planned E2E tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add shared test factories package

Create tests/factories/ with reusable test factories:
- FakeApp: mock application with all dependencies
- Message chains: text_chain, mention_chain, image_chain
- Query factories: text_query, group_text_query, command_query, etc.

No test changes - maintains backward compatibility.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake provider factory

Add tests/factories/provider.py with:
- FakeProvider: deterministic fake LLM provider
- Error simulation: timeout, auth, rate-limit, malformed
- Request capture for assertions
- fake_model: mock model with attached provider

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake platform factory

Add tests/factories/platform.py with:
- FakePlatform: simulated platform adapter
- Inbound message construction: friend/group/image
- Mention-bot flag simulation
- Outbound message capture for assertions
- Streaming output support simulation
- Send failure simulation

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add comprehensive message/query factories

Extend tests/factories/message.py with:
- file_query: file attachment query
- unsupported_query: unknown message segment
- voice_query: audio/voice query
- at_all_query: group @All mention
- query_with_session: query with session object
- query_with_config: query with custom pipeline config

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add fake message flow smoke test

Create tests/smoke/test_fake_message_flow.py:
- TestFakeMessageFlow: factory verification tests
- TestMessageFlowIntegration: minimal flow smoke test
- Tests FakeApp, FakeProvider, FakePlatform, query factories
- Verifies LANGBOT_FAKE_PONG marker response
- Captures outbound messages for assertions

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add developer test-quick command

Add scripts/test-quick.sh and Makefile with:
- test-quick: runs ruff check + unit tests + smoke tests
- No real provider keys or platform accounts required
- Suitable for local branch self-test

Update tests/README.md:
- Document test-quick command
- Document test factories package
- Add smoke tests and factories directory structure

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(test): make test-quick reliable as developer gate

Fixes for D-001验收问题:
1. test-quick.sh: use set -euo pipefail, uv run ruff, no tail pipe
2. Remove unused imports in factories (app.py, platform.py, provider.py)
3. Fix unused variable in smoke test
4. Add noqa: E402 to test_n8nsvapi.py lazy imports
5. Update smoke test docs: "minimal fake flow" not full pipeline

Now test-quick is a reliable gate: lint failures exit 1, test failures propagate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add preproc and taskmgr unit tests

U-001: Pipeline Preprocessor tests
- Normal text message processing
- Empty message handling
- Image segment with/without vision model
- Model selection and fallback
- Variable extraction

U-004: Core Task Manager tests (pattern-based)
- Task creation and tracking patterns
- Task cancellation patterns
- Scope-based cancellation
- Task type filtering
- Pruning completed tasks
- Wait all tasks

Taskmgr tests use pattern-based approach to avoid circular import
in source code (taskmgr → app → http_controller → migration → taskmgr).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add config loader unit tests

U-005: Config Loader tests
- Valid YAML config loading
- Valid JSON config loading
- Invalid YAML/JSON error behavior
- Missing config file creation from template
- Template completion for missing keys
- ConfigManager load/dump operations
- Exists check for both YAML and JSON

All tests use tmp_path fixture, no real project config.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): add chat and command handler pattern tests

U-002: Chat Handler tests (pattern-based)
- Normal message event emission pattern
- prevent_default handling
- User message alteration pattern
- Runner selection pattern
- Streaming/non-streaming response patterns
- Exception handling modes (show-error, show-hint, hide)
- Message history update pattern
- Telemetry payload pattern

U-003: Command Handler tests (pattern-based)
- Command parsing and text extraction
- Event creation pattern
- Privilege/admin check pattern
- Command result handling (text, error, image)
- prevent_default handling
- String truncation helper

Uses pattern-based testing to avoid circular import issues in source code.
Direct imports of handler modules trigger circular import chain.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* style: fix unused imports after ruff auto-fix

Remove unused imports in test files:
- test_config_loader.py: remove unused os
- test_taskmgr.py: remove unused Mock
- test_preproc.py: remove unused unsupported_query, image_chain

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(unit): improve taskmgr tests to test real classes

U-004 improved: Tests now import and test actual classes:
- TaskContext: new(), trace(), to_dict(), placeholder()
- TaskWrapper: task creation, context, exception/result capture, cancel, to_dict
- AsyncTaskManager: create_task, create_user_task, cancel_task, cancel_by_scope
- Task pruning behavior

Uses pre-mocking technique:
- Mock langbot.pkg.core.app before import (breaks circular chain)
- Mock langbot.pkg.core.entities with proper Enum

All 24 tests now test real class behavior, not patterns.
taskmgr.py coverage should improve significantly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(test): consolidate FakeApp and add sys.modules isolation utility

- Extract tests/utils/import_isolation.py with isolated_sys_modules context manager
- Extend tests/factories/app.py FakeApp with handler-specific attributes
- Refactor test_chat_handler.py to use centralized FakeApp and cached imports
- Refactor test_command_handler.py with mock_execute_factory fixture
- Refactor test_smoke.py to move import-time sys.modules manipulation into fixture
- Add SQLite migration integration tests (G-002)
- Add HTTP API smoke integration tests (G-005)
- Update CI workflow to call pytest for SQLite migrations (G-004)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add developer quality gate consolidation (G-007)

- Add scripts/test-integration-fast.sh for fast integration tests
- Add scripts/test-coverage.sh with 12% baseline threshold
- Update Makefile with test-integration-fast, test-coverage, test-all-local
- Update CI workflow with integration and coverage jobs
- Add smoke marker to pytest.ini
- Update tests/README.md with quality gate layers documentation
- Add tests/integration/pipeline/ for pipeline stage-chain tests

Quality gate layers:
- Quick: ruff + unit + smoke (~2 min)
- Fast Integration: SQLite/API/Pipeline (~3 min)
- Coverage: 12% threshold gate (~8 min)
- Full Local: all three combined

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): add PostgreSQL migration slow integration tests (G-003)

- Add tests/integration/persistence/test_migrations_postgres.py
- All tests marked with @pytest.mark.slow
- Tests skip when TEST_POSTGRES_URL is not set (no local PostgreSQL)
- Database isolation via clean_tables and clean_alembic_version fixtures
- Update CI workflow to use pytest instead of inline Python script
- Remove TODO(G-003) comment
- Update tests/README.md with PostgreSQL test documentation

Covered scenarios:
- Baseline stamp sets revision
- Upgrade from baseline to head
- Upgrade idempotent
- Get current on unstamped DB returns None

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(test): Phase 1.5 coverage expansion - COV-001 to COV-013

Coverage baseline raised from 13.65% to 26% (+12.35%)
Gate raised from 12% to 18%

Tasks completed:
- COV-001: Command system unit tests (100% coverage)
- COV-002: API service unit tests batch 1 (user/apikey/model/provider)
- COV-003: Provider model manager unit tests
- COV-004: Pipeline remaining stage tests (aggregator/cntfilter/longtext/msgtrun)
- COV-005: Storage and utils coverage pass
- COV-006: Gate ratchet 12%→15%
- COV-007: Gate ratchet 15%→18%
- COV-008: API service batch 2 (bot/pipeline/webhook/space/maintenance/mcp)
- COV-009: Blocked - API controller circular import issue documented
- COV-010: Plugin runtime unit tests (+0.08%)
- COV-011: RAG and vector unit tests (+0.68%)
- COV-012: Core boot and migration unit tests
- COV-013: Provider requester logic unit tests (+0.62%)

Key additions:
- tests/utils/import_isolation.py: sys.modules isolation for circular imports
- Provider requester mock tests: proved HTTP-dependent code can be tested locally
- Vector filter utilities: 100% coverage on pure functions
- API services: fake persistence pattern for unit testing

Blocked issue COV-009 documented in langbot-test-plan/1.5/issues/

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(phase1): add unit tests for telemetry, plugin, rag, persistence

Add initial unit tests for Phase 1 of test coverage improvement:
- telemetry: test initialization, payload sanitization, early returns (14.3% → 62.9%)
- plugin: test _parse_plugin_id static method
- rag: test _to_i18n_name static method
- persistence: test serialize_model with datetime handling

Overall core coverage: 41.9% → 42.2%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(phase2): add unit tests for core, persistence, plugin, utils

- Add test_handler_helpers.py for plugin handler helpers (7 tests)
- Add test_mgr_methods.py for persistence manager (5 tests)
- Add test_app_config_validation.py for core app config (12 tests)
- Add test_knowledge_service.py for API knowledge service (22 tests)
- Add test_kbmgr.py for RAG knowledge base manager (39 tests)
- Add test_survey_manager.py for survey manager (22 tests)
- Add test_connector_methods.py for plugin connector (24 tests)
- Add test_funcschema.py for utils function schema (9 tests)
- Add test_platform.py for utils platform detection (7 tests)
- Add test_extract_deps.py for plugin deps extraction (7 tests)
- Add test_database_decorator.py for persistence decorator (7 tests)
- Add test_load_config.py for core config loading (19 tests)
- Add COVERAGE_EXCLUSIONS.md documenting external adapter exclusions
- Fix test_chat_session_limit.py path for portability

Coverage: core 28% → 30%, persistence 24% → 24.4%, plugin 27% → 28%
Total: 1082 tests passed, core module coverage 45.5%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add API controller integration tests

- Add test_pipelines.py (10 tests) covering pipelines CRUD operations
  - GET/POST/PUT/DELETE on /api/v1/pipelines
  - Extensions endpoint
  - Metadata endpoint
  - Coverage: pipelines controller 27% → 80%

- Add test_providers.py (10 tests) covering provider/model management
  - Provider CRUD with model counts
  - LLM model CRUD
  - Coverage: providers controller 23% → 81%, models 29% → 45%

Tests use Quart TestClient with mocked services for real HTTP behavior
without external dependencies.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add knowledge, bots, and model endpoints tests

- Add test_knowledge.py (10 tests) covering knowledge base management
  - CRUD operations on /api/v1/knowledge/bases
  - Files management endpoints
  - Retrieve endpoint with validation
  - Coverage: knowledge/base.py 26% → 91%

- Add test_bots.py (9 tests) covering bot management
  - CRUD operations on /api/v1/platform/bots
  - Logs endpoint
  - Send message endpoint with validation
  - Coverage: platform/bots.py 24% → 87%

- Extend test_providers.py (+4 tests) for embedding/rerank models
  - Embedding models CRUD
  - Rerank models CRUD
  - Coverage: provider/models.py 29% → 60%

Total integration tests: 53 (smoke 12 + pipelines 10 + providers 14 + knowledge 10 + bots 9)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(integration): add embed and monitoring endpoint tests

Add integration tests for embed widget and monitoring API endpoints:
- test_embed.py: 15 tests for widget.js, logo, turnstile, messages, reset, feedback
- test_monitoring.py: 15 tests for overview, messages, llm-calls, sessions, errors, export

Coverage improvements:
- embed.py: 17% → 56%
- monitoring.py: 17% → 93%

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(e2e): add minimal startup E2E tests

Add E2E tests for LangBot startup flow:
- tests/e2e/utils/config_factory.py: minimal config generation
- tests/e2e/utils/process_manager.py: LangBot subprocess management
- tests/e2e/conftest.py: E2E fixtures (session-scoped process)
- tests/e2e/test_startup.py: 12 tests for startup verification

Tests verify:
- boot.py + stages execution
- database initialization (SQLite)
- API availability
- migrations applied

Uses embedded databases (SQLite, Chroma) - no external dependencies.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test(quality): fix fake tests and add missing coverage

P0 fixes:
- telemetry: rewrite fake tests with real behavior verification (25 tests)
- config: delete copied-source tests, use proper imports (2 deleted)
- persistence: fix try-except pass to verify specific errors

P1 fixes:
- pipeline: add real FixedWindowAlgo tests instead of mocks (12 tests)
- provider: add SessionManager and ToolManager tests (25 tests)
- storage: add S3StorageProvider tests with moto mock (16 tests)
- plugin: add handler action tests for setting inheritance (15 tests)
- rag: add file storage and ZIP processing tests (21 tests)
- vector: add VDB filter conversion tests (30 tests)

P2 fixes:
- pipeline/msgtrun: strengthen assertions for exact message count
- api: add response structure validation in integration tests

New test files:
- provider/test_session_manager.py
- provider/test_tool_manager.py
- storage/test_s3storage.py
- plugin/test_handler_actions.py
- rag/test_file_storage.py
- vector/test_vdb_filter_conversion.py

Source code bugs documented:
- provider: TokenManager.next_token() ZeroDivisionError
- telemetry: send_tasks class variable shared state
- command: empty command IndexError, unused parameters
- utils: funcschema KeyError
- entity: vector.py independent declarative_base

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(test): update coverage stats and test structure

- Update coverage from 22% to 30%
- Add new test files to structure:
  - provider: session_manager, tool_manager
  - storage: s3storage
  - plugin: handler_actions
  - rag: file_storage
  - vector: vdb_filter_conversion
  - telemetry: rewritten tests
- Update module coverage percentages

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* test: add 105 new unit tests for untested core functionality

Add comprehensive tests for B-class issues (core functionality untested):

Pipeline:
- test_pool.py: QueryPool ID generation, caching, async context (12 tests)
- test_ratelimit.py: Fixed timing-sensitive test tolerance
- test_pipelinemgr.py: Use real Pydantic StageProcessResult instead of Mock

Utils:
- test_version.py: Version comparison functions (20 tests)
- test_logcache.py: Log page management and retrieval (18 tests)
- test_httpclient.py: HTTP session pool management (10 tests)
- test_proxy.py: Proxy configuration from env and config (10 tests)
- test_image.py: URL parsing and base64 extraction (12 tests)
- test_pkgmgr.py: Pip command generation (8 tests)

Discover:
- test_engine.py: I18nString, Metadata, Component manifest (15 tests)

Test count: 1193 → 1298 (+105 tests)

Note: Some B-class issues cannot be tested due to circular import bugs
filed as GitHub issues #2175 (pipeline) and #2176 (persistence).

* test: tighten phase 1 coverage contracts

* test: align ci integration isolation

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 12:05:54 +08:00

778 lines
29 KiB
Python

"""
Pipeline full-flow integration tests.
Tests real pipeline stages with fake runner/provider.
Validates message processing through PreProcessor, Processor, and SendResponseBackStage.
Uses RuntimePipeline directly (not PipelineManager) to avoid DB dependency.
Run: uv run pytest tests/integration/pipeline -q --tb=short
"""
from __future__ import annotations
import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
import sys
from tests.factories import FakeApp, text_query, mock_platform_adapter
from tests.factories.provider import FakeProvider
from tests.factories.platform import FakePlatform
pytestmark = pytest.mark.integration
# ============== FIXTURE FOR SYS.MODULES ISOLATION ==============
@pytest.fixture(scope='module')
def mock_circular_import_chain():
"""
Break circular import chain for pipeline modules using isolated_sys_modules.
Chain: pipeline → core.app → provider.runner → http_controller → groups/plugins
We mock minimal modules to allow importing RuntimePipeline, StageInstContainer,
and stage classes without triggering full application initialization.
After mocking, we import the stage modules so decorators register them.
"""
from tests.utils.import_isolation import isolated_sys_modules, MockLifecycleControlScope
# Mock core.entities with LifecycleControlScope enum
mock_core_entities = Mock()
mock_core_entities.LifecycleControlScope = MockLifecycleControlScope
# Mock core.app - Application class is referenced but not instantiated
mock_core_app = Mock()
# Mock provider.runner with preregistered_runners list
mock_runner = Mock()
mock_runner.preregistered_runners = [] # Will be populated in tests
# Mock utils.importutil - prevents auto-import of runners
mock_importutil = Mock()
mock_importutil.import_modules_in_pkg = lambda pkg: None
mock_importutil.import_modules_in_pkgs = lambda pkgs: None
# Modules to clear (force re-import after mocking)
clear = [
'langbot.pkg.pipeline.stage',
'langbot.pkg.pipeline.entities',
'langbot.pkg.pipeline.pipelinemgr',
'langbot.pkg.pipeline.preproc.preproc',
'langbot.pkg.pipeline.process.process',
'langbot.pkg.pipeline.process.handler',
'langbot.pkg.pipeline.process.handlers.chat',
'langbot.pkg.pipeline.process.handlers.command',
'langbot.pkg.pipeline.respback.respback',
'langbot.pkg.provider.runner',
]
with isolated_sys_modules(
mocks={
'langbot.pkg.core.entities': mock_core_entities,
'langbot.pkg.core.app': mock_core_app,
'langbot.pkg.provider.runner': mock_runner,
'langbot.pkg.utils.importutil': mock_importutil,
'langbot.pkg.pipeline.controller': Mock(),
'langbot.pkg.pipeline.pipelinemgr': Mock(),
},
clear=clear,
):
# Import stage modules AFTER clearing so decorators register them
from importlib import import_module
# Import stage base first
import_module('langbot.pkg.pipeline.stage')
# Import entities
import_module('langbot.pkg.pipeline.entities')
# Import specific stages to register them
import_module('langbot.pkg.pipeline.preproc.preproc')
import_module('langbot.pkg.pipeline.process.process')
import_module('langbot.pkg.pipeline.respback.respback')
# Import pipelinemgr for RuntimePipeline
import_module('langbot.pkg.pipeline.pipelinemgr')
yield
# ============== FAKE RUNNER ==============
class FakeRunner:
"""Minimal fake runner class for pipeline integration tests.
Note: preregistered_runners expects a CLASS, not an instance.
The handler calls runner_cls(self.ap, query.pipeline_config) to instantiate.
"""
name = 'local-agent'
def __init__(self, app=None, config=None):
self.app = app
self.config = config or {}
self._provider = FakeProvider()
# Instance-level configuration set via class attribute
self._response_text = "fake response"
self._raise_error = None
@classmethod
def returns(cls, text: str):
"""Create a runner class configured to return specific text."""
# We create a subclass with configured response
class ConfiguredRunner(cls):
name = cls.name
_response_text = text
_raise_error = None
def __init__(self, app=None, config=None):
super().__init__(app, config)
self._response_text = text
return ConfiguredRunner
@classmethod
def raises(cls, error: Exception):
"""Create a runner class configured to raise an error."""
class ConfiguredRunner(cls):
name = cls.name
_response_text = None
_raise_error = error
def __init__(self, app=None, config=None):
super().__init__(app, config)
self._raise_error = error
return ConfiguredRunner
async def run(self, query):
"""Run the fake provider and yield messages."""
from langbot_plugin.api.entities.builtin.provider.message import Message
# Use the configured response/error
if self._raise_error:
raise self._raise_error
# Yield a simple message
yield Message(role='assistant', content=self._response_text)
# ============== PIPELINE APP FIXTURE ==============
@pytest.fixture
def pipeline_app():
"""
Create FakeApp with all dependencies required by pipeline stages.
PreProcessor needs: sess_mgr, model_mgr, tool_mgr, plugin_connector
Processor needs: instance_config, plugin_connector
SendResponseBackStage needs: logger
ChatMessageHandler needs: telemetry, survey
"""
app = FakeApp()
# Session/conversation mocks for PreProcessor
mock_session = Mock()
mock_session.launcher_type = Mock()
mock_session.launcher_type.value = 'person'
mock_session.launcher_id = 12345
mock_session.sender_id = 12345
mock_session.use_prompt_name = 'default'
mock_session.using_conversation = None
# Create a simple class to mimic Prompt behavior
class MockPrompt:
def __init__(self, name, messages):
self.name = name
self.messages = messages
def copy(self):
return MockPrompt(self.name, list(self.messages))
# Create real lists for messages
prompt_messages_list = []
messages_list = []
mock_prompt = MockPrompt('default', prompt_messages_list)
mock_conversation = Mock()
mock_conversation.prompt = mock_prompt
mock_conversation.messages = messages_list
mock_conversation.uuid = 'test-conversation-uuid'
mock_conversation.update_time = None
mock_conversation.create_time = None
app.sess_mgr.get_session = AsyncMock(return_value=mock_session)
app.sess_mgr.get_conversation = AsyncMock(return_value=mock_conversation)
# Model mock for PreProcessor
mock_model = Mock()
mock_model.model_entity = Mock()
mock_model.model_entity.uuid = 'test-model-uuid'
mock_model.model_entity.name = 'test-model'
mock_model.model_entity.abilities = ['func_call', 'vision']
app.model_mgr.get_model_by_uuid = AsyncMock(return_value=mock_model)
# Tool manager mock
app.tool_mgr.get_all_tools = AsyncMock(return_value=[])
# Telemetry mock (required by ChatMessageHandler)
app.telemetry = Mock()
app.telemetry.start_send_task = AsyncMock()
# Survey mock
app.survey = None
return app
@pytest.fixture
def fake_platform_adapter():
"""Create a fake platform adapter for outbound capture."""
platform = FakePlatform(stream_output_supported=False)
adapter = mock_platform_adapter(platform)
return adapter, platform
@pytest.fixture
def set_fake_runner():
"""Factory fixture to set a fake runner CLASS in preregistered_runners."""
def _set_runner(runner_cls):
# preregistered_runners expects a list of runner classes
sys.modules['langbot.pkg.provider.runner'].preregistered_runners = [runner_cls]
return _set_runner
# ============== PIPELINE CONFIGURATION ==============
def create_minimal_pipeline_config():
"""Create minimal pipeline configuration for tests."""
return {
'ai': {
'runner': {'runner': 'local-agent', 'expire-time': None},
'local-agent': {
'model': {'primary': 'test-model-uuid', 'fallbacks': []},
'prompt': 'default',
'knowledge-bases': [],
},
},
'output': {
'force-delay': {'min': 0.0, 'max': 0.0},
'misc': {
'at-sender': False,
'quote-origin': False,
'exception-handling': 'show-hint',
'failure-hint': 'Request failed.',
},
},
'trigger': {
'misc': {'combine-quote-message': False},
},
}
# ============== HELPER TO PROCESS COROUTINE/GENERATOR ==============
async def collect_processor_results(processor, query, stage_name):
"""
Helper to handle the coroutine -> async_generator pattern.
Processor.process() returns a coroutine that yields an async_generator.
This helper handles both cases like RuntimePipeline does.
"""
result = processor.process(query, stage_name)
# Handle coroutine (await it to get async_generator)
if asyncio.iscoroutine(result):
result = await result
# Now iterate over async_generator
results = []
async for item in result:
results.append(item)
return results
# ============== TESTS ==============
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestPipelineStageChainReal:
"""Tests for real pipeline stage chain."""
@pytest.mark.asyncio
async def test_import_pipeline_modules(self):
"""Verify we can import real pipeline modules."""
from langbot.pkg.pipeline import stage, entities
from langbot.pkg.pipeline import pipelinemgr
assert hasattr(stage, 'PipelineStage')
assert hasattr(stage, 'preregistered_stages')
assert hasattr(entities, 'ResultType')
assert hasattr(entities, 'StageProcessResult')
assert hasattr(pipelinemgr, 'RuntimePipeline')
assert hasattr(pipelinemgr, 'StageInstContainer')
@pytest.mark.asyncio
async def test_stage_preregistration(self):
"""Verify stages are preregistered after fixture imports them."""
from langbot.pkg.pipeline import stage
# Check that our target stages are registered
assert 'PreProcessor' in stage.preregistered_stages
assert 'MessageProcessor' in stage.preregistered_stages
assert 'SendResponseBackStage' in stage.preregistered_stages
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestPreProcessorStage:
"""Tests for PreProcessor stage alone."""
@pytest.mark.asyncio
async def test_preproc_continues_on_valid_query(self, pipeline_app, fake_platform_adapter):
"""PreProcessor should return CONTINUE for valid text query."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
adapter, platform = fake_platform_adapter
# Create query with adapter
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector for PromptPreProcessing event
mock_event_ctx = Mock()
mock_event_ctx.event = Mock()
mock_event_ctx.event.default_prompt = [] # Real list
mock_event_ctx.event.prompt = [] # Real list
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create PreProcessor stage
preproc_stage = preproc.PreProcessor(pipeline_app)
result = await preproc_stage.process(query, 'PreProcessor')
assert result.result_type == entities.ResultType.CONTINUE
assert result.new_query.session is not None
assert result.new_query.user_message is not None
@pytest.mark.asyncio
async def test_preproc_sets_user_message(self, pipeline_app, fake_platform_adapter):
"""PreProcessor should set user_message from message_chain."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
adapter, platform = fake_platform_adapter
query = text_query("test message content")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector for PromptPreProcessing event
mock_event_ctx = Mock()
mock_event_ctx.event = Mock()
mock_event_ctx.event.default_prompt = []
mock_event_ctx.event.prompt = []
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
preproc_stage = preproc.PreProcessor(pipeline_app)
result = await preproc_stage.process(query, 'PreProcessor')
assert result.result_type == entities.ResultType.CONTINUE
# Check user_message content
assert result.new_query.user_message is not None
assert result.new_query.user_message.role == 'user'
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestProcessorStage:
"""Tests for MessageProcessor stage."""
@pytest.mark.asyncio
async def test_processor_calls_chat_handler(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""Processor should route to ChatMessageHandler for non-command messages."""
adapter, platform = fake_platform_adapter
# Set fake runner that returns pong
fake_runner = FakeRunner().returns("LANGBOT_FAKE_PONG")
set_fake_runner(fake_runner)
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = []
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
# Collect results using helper
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) >= 1
# Check that resp_messages was populated
assert len(query.resp_messages) >= 1
@pytest.mark.asyncio
async def test_processor_prevent_default_without_reply_interrupts(self, pipeline_app, fake_platform_adapter):
"""Processor should INTERRUPT when plugin prevents default without reply."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector to prevent default without reply
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=True)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
@pytest.mark.asyncio
async def test_processor_prevent_default_with_reply_continues(self, pipeline_app, fake_platform_adapter):
"""Processor should CONTINUE when plugin prevents default with reply."""
from langbot.pkg.pipeline import entities
from tests.factories.message import text_chain
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = []
# Create reply chain
reply_chain = text_chain("plugin response")
# Mock plugin_connector to prevent default with reply
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=True)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = reply_chain
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.CONTINUE
assert len(query.resp_messages) == 1
assert query.resp_messages[0] == reply_chain
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestRunnerExceptionFlow:
"""Tests for runner exception handling."""
@pytest.mark.asyncio
async def test_runner_exception_yields_interrupt(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""Runner exception should yield INTERRUPT with error notices."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises exception
fake_runner = FakeRunner().raises(ValueError("API Error: rate limit exceeded"))
set_fake_runner(fake_runner)
# Create query with exception handling config
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'show-hint'
config['output']['misc']['failure-hint'] = 'Request failed.'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert results[0].user_notice == 'Request failed.'
assert results[0].error_notice is not None
@pytest.mark.asyncio
async def test_runner_exception_show_error_mode(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""show-error mode should show actual exception message."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises specific exception
fake_runner = FakeRunner().raises(RuntimeError("Custom runtime error"))
set_fake_runner(fake_runner)
# Create query with show-error mode
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'show-error'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert 'Custom runtime error' in results[0].user_notice
@pytest.mark.asyncio
async def test_runner_exception_hide_mode(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""hide mode should not show user notice."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises exception
fake_runner = FakeRunner().raises(Exception("Hidden error"))
set_fake_runner(fake_runner)
# Create query with hide mode
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'hide'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert results[0].user_notice is None
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestSendResponseBackStage:
"""Tests for SendResponseBackStage."""
@pytest.mark.asyncio
async def test_send_response_calls_adapter(self, pipeline_app, fake_platform_adapter):
"""SendResponseBackStage should call adapter.reply_message."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.respback import respback
from tests.factories.message import text_chain
from langbot_plugin.api.entities.builtin.provider.message import Message
adapter, platform = fake_platform_adapter
# Create query with response message
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Add response message
query.resp_messages = [Message(role='assistant', content='test response')]
query.resp_message_chain = [text_chain('test response')]
# Create SendResponseBackStage
respback_stage = respback.SendResponseBackStage(pipeline_app)
result = await respback_stage.process(query, 'SendResponseBackStage')
assert result.result_type == entities.ResultType.CONTINUE
# Check that adapter was called
outbound = platform.get_outbound_messages()
assert len(outbound) == 1
assert outbound[0]['type'] == 'reply'
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestStageChainIntegration:
"""Tests for full stage chain (PreProcessor -> Processor -> SendResponseBackStage)."""
@pytest.mark.asyncio
async def test_full_chain_text_message_flow(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""
Full chain: text message -> PreProcessor -> Processor -> SendResponseBackStage.
Validates:
- PreProcessor sets up session, user_message
- Processor calls runner and populates resp_messages
- SendResponseBackStage calls adapter.reply_message
"""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
from langbot.pkg.pipeline.process import process
from langbot.pkg.pipeline.respback import respback
adapter, platform = fake_platform_adapter
# Set fake runner
fake_runner = FakeRunner().returns("LANGBOT_FAKE_PONG")
set_fake_runner(fake_runner)
# Create query
config = create_minimal_pipeline_config()
query = text_query("ping")
query.adapter = adapter
query.pipeline_config = config
query.resp_messages = []
query.resp_message_chain = []
# Mock plugin_connector for PreProcessor and Processor events
mock_event_ctx_preproc = Mock()
mock_event_ctx_preproc.event = Mock()
mock_event_ctx_preproc.event.default_prompt = []
mock_event_ctx_preproc.event.prompt = []
mock_event_ctx_processor = Mock()
mock_event_ctx_processor.is_prevented_default = Mock(return_value=False)
mock_event_ctx_processor.event = Mock()
mock_event_ctx_processor.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock()
pipeline_app.plugin_connector.emit_event.side_effect = [
mock_event_ctx_preproc, # PreProcessor PromptPreProcessing
mock_event_ctx_processor, # Processor NormalMessageReceived
]
# Create stages
preproc_stage = preproc.PreProcessor(pipeline_app)
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(config)
respback_stage = respback.SendResponseBackStage(pipeline_app)
# Run PreProcessor
result1 = await preproc_stage.process(query, 'PreProcessor')
assert result1.result_type == entities.ResultType.CONTINUE
query = result1.new_query
# Run Processor
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) >= 1
# Build resp_message_chain from resp_messages
from tests.factories.message import text_chain
for resp_msg in query.resp_messages:
if resp_msg.content:
query.resp_message_chain.append(text_chain(resp_msg.content))
# Run SendResponseBackStage
result3 = await respback_stage.process(query, 'SendResponseBackStage')
assert result3.result_type == entities.ResultType.CONTINUE
# Verify adapter was called
outbound = platform.get_outbound_messages()
assert len(outbound) >= 1
@pytest.mark.asyncio
async def test_chain_stops_on_interrupt(self, pipeline_app, fake_platform_adapter):
"""
Chain should stop when a stage returns INTERRUPT.
PreProcessor returns CONTINUE, Processor returns INTERRUPT (prevent_default).
"""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
from langbot.pkg.pipeline.process import process
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector - PreProcessor continues, Processor interrupts
mock_event_ctx_preproc = Mock()
mock_event_ctx_preproc.event = Mock()
mock_event_ctx_preproc.event.default_prompt = []
mock_event_ctx_preproc.event.prompt = []
mock_event_ctx_processor = Mock()
mock_event_ctx_processor.is_prevented_default = Mock(return_value=True)
mock_event_ctx_processor.event = Mock()
mock_event_ctx_processor.event.reply_message_chain = None
pipeline_app.plugin_connector.emit_event = AsyncMock()
pipeline_app.plugin_connector.emit_event.side_effect = [
mock_event_ctx_preproc, # PreProcessor PromptPreProcessing
mock_event_ctx_processor, # Processor NormalMessageReceived
]
# Create stages
preproc_stage = preproc.PreProcessor(pipeline_app)
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
# Run PreProcessor
result1 = await preproc_stage.process(query, 'PreProcessor')
assert result1.result_type == entities.ResultType.CONTINUE
query = result1.new_query
# Run Processor - should INTERRUPT
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
# Chain stops here - no resp_messages
assert len(query.resp_messages) == 0