Files
LangBot/tests/integration/pipeline/test_full_flow.py
2026-06-10 22:36:23 +08:00

765 lines
28 KiB
Python

"""
Pipeline full-flow integration tests.
Tests real pipeline stages with fake runner/provider.
Validates message processing through PreProcessor, Processor, and SendResponseBackStage.
Uses RuntimePipeline directly (not PipelineManager) to avoid DB dependency.
Run: uv run pytest tests/integration/pipeline -q --tb=short
"""
from __future__ import annotations
import pytest
import asyncio
from unittest.mock import AsyncMock, Mock
from tests.factories import FakeApp, text_query, mock_platform_adapter
from tests.factories.platform import FakePlatform
from tests.factories.message import text_chain
pytestmark = pytest.mark.integration
DEFAULT_RUNNER_ID = 'plugin:langbot/local-agent/default'
# ============== FIXTURE FOR SYS.MODULES ISOLATION ==============
@pytest.fixture(scope='module')
def mock_circular_import_chain():
"""
Break circular import chain for pipeline modules using isolated_sys_modules.
Chain: pipeline → core.app → http_controller → groups/plugins
We mock minimal modules to allow importing RuntimePipeline, StageInstContainer,
and stage classes without triggering full application initialization.
After mocking, we import the stage modules so decorators register them.
"""
from tests.utils.import_isolation import isolated_sys_modules, MockLifecycleControlScope
# Mock core.entities with LifecycleControlScope enum
mock_core_entities = Mock()
mock_core_entities.LifecycleControlScope = MockLifecycleControlScope
# Mock core.app - Application class is referenced but not instantiated
mock_core_app = Mock()
# Mock utils.importutil - prevents auto-import of runners
mock_importutil = Mock()
mock_importutil.import_modules_in_pkg = lambda pkg: None
mock_importutil.import_modules_in_pkgs = lambda pkgs: None
# Modules to clear (force re-import after mocking)
clear = [
'langbot.pkg.pipeline.stage',
'langbot.pkg.pipeline.entities',
'langbot.pkg.pipeline.pipelinemgr',
'langbot.pkg.pipeline.preproc.preproc',
'langbot.pkg.pipeline.process.process',
'langbot.pkg.pipeline.process.handler',
'langbot.pkg.pipeline.process.handlers.chat',
'langbot.pkg.pipeline.process.handlers.command',
'langbot.pkg.pipeline.respback.respback',
]
with isolated_sys_modules(
mocks={
'langbot.pkg.core.entities': mock_core_entities,
'langbot.pkg.core.app': mock_core_app,
'langbot.pkg.utils.importutil': mock_importutil,
'langbot.pkg.pipeline.controller': Mock(),
'langbot.pkg.pipeline.pipelinemgr': Mock(),
},
clear=clear,
):
# Import stage modules AFTER clearing so decorators register them
from importlib import import_module
# Import stage base first
import_module('langbot.pkg.pipeline.stage')
# Import entities
import_module('langbot.pkg.pipeline.entities')
# Import specific stages to register them
import_module('langbot.pkg.pipeline.preproc.preproc')
import_module('langbot.pkg.pipeline.process.process')
import_module('langbot.pkg.pipeline.respback.respback')
# Import pipelinemgr for RuntimePipeline
import_module('langbot.pkg.pipeline.pipelinemgr')
yield
# ============== FAKE RUNNER ==============
class FakeRunner:
"""Minimal fake runner behavior for the orchestrator-backed pipeline tests."""
name = 'local-agent'
def __init__(self, response_text: str = "fake response", error: Exception | None = None):
self._response_text = response_text
self._raise_error = error
@classmethod
def returns(cls, text: str) -> "FakeRunner":
"""Create a fake runner configured to return specific text."""
return cls(response_text=text)
@classmethod
def raises(cls, error: Exception) -> "FakeRunner":
"""Create a runner class configured to raise an error."""
return cls(error=error)
async def run(self, query):
"""Run the fake provider and yield messages."""
from langbot_plugin.api.entities.builtin.provider.message import Message
# Use the configured response/error
if self._raise_error:
raise self._raise_error
# Yield a simple message
yield Message(role='assistant', content=self._response_text)
class FakeAgentRunOrchestrator:
"""Adapter that exposes FakeRunner through the current AgentRunOrchestrator surface."""
def __init__(self, runner: FakeRunner | None = None):
self.runner = runner or FakeRunner()
self.queries = []
async def run_from_query(self, query):
self.queries.append(query)
async for result in self.runner.run(query):
yield result
def resolve_runner_id_for_telemetry(self, query):
return DEFAULT_RUNNER_ID
# ============== PIPELINE APP FIXTURE ==============
@pytest.fixture
def pipeline_app():
"""
Create FakeApp with all dependencies required by pipeline stages.
PreProcessor needs: sess_mgr, model_mgr, tool_mgr, plugin_connector
Processor needs: instance_config, plugin_connector
SendResponseBackStage needs: logger
ChatMessageHandler needs: telemetry, survey
"""
app = FakeApp()
# Session/conversation mocks for PreProcessor
mock_session = Mock()
mock_session.launcher_type = Mock()
mock_session.launcher_type.value = 'person'
mock_session.launcher_id = 12345
mock_session.sender_id = 12345
mock_session.use_prompt_name = 'default'
mock_session.using_conversation = None
# Create a simple class to mimic Prompt behavior
class MockPrompt:
def __init__(self, name, messages):
self.name = name
self.messages = messages
def copy(self):
return MockPrompt(self.name, list(self.messages))
# Create real lists for messages
prompt_messages_list = []
messages_list = []
mock_prompt = MockPrompt('default', prompt_messages_list)
mock_conversation = Mock()
mock_conversation.prompt = mock_prompt
mock_conversation.messages = messages_list
mock_conversation.uuid = 'test-conversation-uuid'
mock_conversation.update_time = None
mock_conversation.create_time = None
app.sess_mgr.get_session = AsyncMock(return_value=mock_session)
app.sess_mgr.get_conversation = AsyncMock(return_value=mock_conversation)
# Model mock for PreProcessor
mock_model = Mock()
mock_model.model_entity = Mock()
mock_model.model_entity.uuid = 'test-model-uuid'
mock_model.model_entity.name = 'test-model'
mock_model.model_entity.abilities = ['func_call', 'vision']
app.model_mgr.get_model_by_uuid = AsyncMock(return_value=mock_model)
# Tool manager mock
app.tool_mgr.get_all_tools = AsyncMock(return_value=[])
# Telemetry mock (required by ChatMessageHandler)
app.telemetry = Mock()
app.telemetry.start_send_task = AsyncMock()
# Survey mock
app.survey = None
app.agent_run_orchestrator = FakeAgentRunOrchestrator()
return app
@pytest.fixture
def fake_platform_adapter():
"""Create a fake platform adapter for outbound capture."""
platform = FakePlatform(stream_output_supported=False)
adapter = mock_platform_adapter(platform)
return adapter, platform
@pytest.fixture
def set_fake_runner(pipeline_app):
"""Factory fixture to set fake runner behavior on the orchestrator surface."""
def _set_runner(runner: FakeRunner):
pipeline_app.agent_run_orchestrator.runner = runner
return _set_runner
# ============== PIPELINE CONFIGURATION ==============
def create_minimal_pipeline_config():
"""Create minimal pipeline configuration for tests."""
return {
'ai': {
'runner': {'id': DEFAULT_RUNNER_ID, 'expire-time': None},
'runner_config': {
DEFAULT_RUNNER_ID: {
'model': {'primary': 'test-model-uuid', 'fallbacks': []},
'prompt': [{'role': 'system', 'content': 'default'}],
'knowledge-bases': [],
},
},
},
'output': {
'force-delay': {'min': 0.0, 'max': 0.0},
'misc': {
'at-sender': False,
'quote-origin': False,
'exception-handling': 'show-hint',
'failure-hint': 'Request failed.',
},
},
'trigger': {
'misc': {'combine-quote-message': False},
},
}
# ============== HELPER TO PROCESS COROUTINE/GENERATOR ==============
async def collect_processor_results(processor, query, stage_name):
"""
Helper to handle the coroutine -> async_generator pattern.
Processor.process() returns a coroutine that yields an async_generator.
This helper handles both cases like RuntimePipeline does.
"""
result = processor.process(query, stage_name)
# Handle coroutine (await it to get async_generator)
if asyncio.iscoroutine(result):
result = await result
# Now iterate over async_generator
results = []
async for item in result:
results.append(item)
return results
# ============== TESTS ==============
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestPipelineStageChainReal:
"""Tests for real pipeline stage chain."""
@pytest.mark.asyncio
async def test_import_pipeline_modules(self):
"""Verify we can import real pipeline modules."""
from langbot.pkg.pipeline import stage, entities
from langbot.pkg.pipeline import pipelinemgr
assert hasattr(stage, 'PipelineStage')
assert hasattr(stage, 'preregistered_stages')
assert hasattr(entities, 'ResultType')
assert hasattr(entities, 'StageProcessResult')
assert hasattr(pipelinemgr, 'RuntimePipeline')
assert hasattr(pipelinemgr, 'StageInstContainer')
@pytest.mark.asyncio
async def test_stage_preregistration(self):
"""Verify stages are preregistered after fixture imports them."""
from langbot.pkg.pipeline import stage
# Check that our target stages are registered
assert 'PreProcessor' in stage.preregistered_stages
assert 'MessageProcessor' in stage.preregistered_stages
assert 'SendResponseBackStage' in stage.preregistered_stages
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestPreProcessorStage:
"""Tests for PreProcessor stage alone."""
@pytest.mark.asyncio
async def test_preproc_continues_on_valid_query(self, pipeline_app, fake_platform_adapter):
"""PreProcessor should return CONTINUE for valid text query."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
adapter, platform = fake_platform_adapter
# Create query with adapter
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector for PromptPreProcessing event
mock_event_ctx = Mock()
mock_event_ctx.event = Mock()
mock_event_ctx.event.default_prompt = [] # Real list
mock_event_ctx.event.prompt = [] # Real list
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create PreProcessor stage
preproc_stage = preproc.PreProcessor(pipeline_app)
result = await preproc_stage.process(query, 'PreProcessor')
assert result.result_type.name == entities.ResultType.CONTINUE.name
assert result.new_query.session is not None
assert result.new_query.user_message is not None
@pytest.mark.asyncio
async def test_preproc_sets_user_message(self, pipeline_app, fake_platform_adapter):
"""PreProcessor should set user_message from message_chain."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
adapter, platform = fake_platform_adapter
query = text_query("test message content")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector for PromptPreProcessing event
mock_event_ctx = Mock()
mock_event_ctx.event = Mock()
mock_event_ctx.event.default_prompt = []
mock_event_ctx.event.prompt = []
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
preproc_stage = preproc.PreProcessor(pipeline_app)
result = await preproc_stage.process(query, 'PreProcessor')
assert result.result_type.name == entities.ResultType.CONTINUE.name
# Check user_message content
assert result.new_query.user_message is not None
assert result.new_query.user_message.role == 'user'
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestProcessorStage:
"""Tests for MessageProcessor stage."""
@pytest.mark.asyncio
async def test_processor_calls_chat_handler(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""Processor should route to ChatMessageHandler for non-command messages."""
adapter, platform = fake_platform_adapter
# Set fake runner that returns pong
fake_runner = FakeRunner.returns("LANGBOT_FAKE_PONG")
set_fake_runner(fake_runner)
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = []
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
# Collect results using helper
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) >= 1
# Check that resp_messages was populated
assert len(query.resp_messages) >= 1
@pytest.mark.asyncio
async def test_processor_prevent_default_without_reply_interrupts(self, pipeline_app, fake_platform_adapter):
"""Processor should INTERRUPT when plugin prevents default without reply."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector to prevent default without reply
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=True)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.INTERRUPT.name
@pytest.mark.asyncio
async def test_processor_prevent_default_with_reply_continues(self, pipeline_app, fake_platform_adapter):
"""Processor should CONTINUE when plugin prevents default with reply."""
from langbot.pkg.pipeline import entities
from tests.factories.message import text_chain
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = []
# Create reply chain
reply_chain = text_chain("plugin response")
# Mock plugin_connector to prevent default with reply
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=True)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = reply_chain
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.CONTINUE.name
assert len(query.resp_messages) == 1
assert query.resp_messages[0] == reply_chain
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestRunnerExceptionFlow:
"""Tests for runner exception handling."""
@pytest.mark.asyncio
async def test_runner_exception_yields_interrupt(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""Runner exception should yield INTERRUPT with error notices."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises exception
fake_runner = FakeRunner.raises(ValueError("API Error: rate limit exceeded"))
set_fake_runner(fake_runner)
# Create query with exception handling config
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'show-hint'
config['output']['misc']['failure-hint'] = 'Request failed.'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.INTERRUPT.name
assert results[0].user_notice == 'Request failed.'
assert results[0].error_notice is not None
@pytest.mark.asyncio
async def test_runner_exception_show_error_mode(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""show-error mode should show actual exception message."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises specific exception
fake_runner = FakeRunner.raises(RuntimeError("Custom runtime error"))
set_fake_runner(fake_runner)
# Create query with show-error mode
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'show-error'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.INTERRUPT.name
assert 'Custom runtime error' in results[0].user_notice
@pytest.mark.asyncio
async def test_runner_exception_hide_mode(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""hide mode should not show user notice."""
from langbot.pkg.pipeline import entities
adapter, platform = fake_platform_adapter
# Set fake runner that raises exception
fake_runner = FakeRunner.raises(Exception("Hidden error"))
set_fake_runner(fake_runner)
# Create query with hide mode
config = create_minimal_pipeline_config()
config['output']['misc']['exception-handling'] = 'hide'
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = config
# Mock plugin_connector to not prevent default
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Create Processor stage
from langbot.pkg.pipeline.process import process
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.INTERRUPT.name
assert results[0].user_notice is None
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestSendResponseBackStage:
"""Tests for SendResponseBackStage."""
@pytest.mark.asyncio
async def test_send_response_calls_adapter(self, pipeline_app, fake_platform_adapter):
"""SendResponseBackStage should call adapter.reply_message."""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.respback import respback
from tests.factories.message import text_chain
from langbot_plugin.api.entities.builtin.provider.message import Message
adapter, platform = fake_platform_adapter
# Create query with response message
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Add response message
query.resp_messages = [Message(role='assistant', content='test response')]
query.resp_message_chain = [text_chain('test response')]
# Create SendResponseBackStage
respback_stage = respback.SendResponseBackStage(pipeline_app)
result = await respback_stage.process(query, 'SendResponseBackStage')
assert result.result_type.name == entities.ResultType.CONTINUE.name
# Check that adapter was called
outbound = platform.get_outbound_messages()
assert len(outbound) == 1
assert outbound[0]['type'] == 'reply'
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestStageChainIntegration:
"""Tests for full stage chain (PreProcessor -> Processor -> SendResponseBackStage)."""
@pytest.mark.asyncio
async def test_full_chain_text_message_flow(self, pipeline_app, fake_platform_adapter, set_fake_runner):
"""
Full chain: text message -> PreProcessor -> Processor -> SendResponseBackStage.
Validates:
- PreProcessor sets up session, user_message
- Processor calls runner and populates resp_messages
- SendResponseBackStage calls adapter.reply_message
"""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
from langbot.pkg.pipeline.process import process
from langbot.pkg.pipeline.respback import respback
adapter, platform = fake_platform_adapter
# Set fake runner
fake_runner = FakeRunner.returns("LANGBOT_FAKE_PONG")
set_fake_runner(fake_runner)
# Create query
config = create_minimal_pipeline_config()
query = text_query("ping")
query.adapter = adapter
query.pipeline_config = config
query.resp_messages = []
query.resp_message_chain = []
# Mock plugin_connector for PreProcessor and Processor events
mock_event_ctx_preproc = Mock()
mock_event_ctx_preproc.event = Mock()
mock_event_ctx_preproc.event.default_prompt = []
mock_event_ctx_preproc.event.prompt = []
mock_event_ctx_processor = Mock()
mock_event_ctx_processor.is_prevented_default = Mock(return_value=False)
mock_event_ctx_processor.event = Mock()
mock_event_ctx_processor.event.user_message_alter = None
pipeline_app.plugin_connector.emit_event = AsyncMock()
pipeline_app.plugin_connector.emit_event.side_effect = [
mock_event_ctx_preproc, # PreProcessor PromptPreProcessing
mock_event_ctx_processor, # Processor NormalMessageReceived
]
# Create stages
preproc_stage = preproc.PreProcessor(pipeline_app)
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(config)
respback_stage = respback.SendResponseBackStage(pipeline_app)
# Run PreProcessor
result1 = await preproc_stage.process(query, 'PreProcessor')
assert result1.result_type.name == entities.ResultType.CONTINUE.name
query = result1.new_query
# Run Processor
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) >= 1
# Build resp_message_chain from resp_messages
for resp_msg in query.resp_messages:
if resp_msg.content:
query.resp_message_chain.append(text_chain(resp_msg.content))
# Run SendResponseBackStage
result3 = await respback_stage.process(query, 'SendResponseBackStage')
assert result3.result_type.name == entities.ResultType.CONTINUE.name
# Verify adapter was called
outbound = platform.get_outbound_messages()
assert len(outbound) >= 1
@pytest.mark.asyncio
async def test_chain_stops_on_interrupt(self, pipeline_app, fake_platform_adapter):
"""
Chain should stop when a stage returns INTERRUPT.
PreProcessor returns CONTINUE, Processor returns INTERRUPT (prevent_default).
"""
from langbot.pkg.pipeline import entities
from langbot.pkg.pipeline.preproc import preproc
from langbot.pkg.pipeline.process import process
adapter, platform = fake_platform_adapter
# Create query
query = text_query("hello")
query.adapter = adapter
query.pipeline_config = create_minimal_pipeline_config()
# Mock plugin_connector - PreProcessor continues, Processor interrupts
mock_event_ctx_preproc = Mock()
mock_event_ctx_preproc.event = Mock()
mock_event_ctx_preproc.event.default_prompt = []
mock_event_ctx_preproc.event.prompt = []
mock_event_ctx_processor = Mock()
mock_event_ctx_processor.is_prevented_default = Mock(return_value=True)
mock_event_ctx_processor.event = Mock()
mock_event_ctx_processor.event.reply_message_chain = None
pipeline_app.plugin_connector.emit_event = AsyncMock()
pipeline_app.plugin_connector.emit_event.side_effect = [
mock_event_ctx_preproc, # PreProcessor PromptPreProcessing
mock_event_ctx_processor, # Processor NormalMessageReceived
]
# Create stages
preproc_stage = preproc.PreProcessor(pipeline_app)
processor_stage = process.Processor(pipeline_app)
await processor_stage.initialize(query.pipeline_config)
# Run PreProcessor
result1 = await preproc_stage.process(query, 'PreProcessor')
assert result1.result_type.name == entities.ResultType.CONTINUE.name
query = result1.new_query
# Run Processor - should INTERRUPT
results = await collect_processor_results(processor_stage, query, 'MessageProcessor')
assert len(results) == 1
assert results[0].result_type.name == entities.ResultType.INTERRUPT.name
# Chain stops here - no resp_messages
assert len(query.resp_messages) == 0