refactor(test): consolidate FakeApp and add sys.modules isolation utility

- Extract tests/utils/import_isolation.py with isolated_sys_modules context manager
- Extend tests/factories/app.py FakeApp with handler-specific attributes
- Refactor test_chat_handler.py to use centralized FakeApp and cached imports
- Refactor test_command_handler.py with mock_execute_factory fixture
- Refactor test_smoke.py to move import-time sys.modules manipulation into fixture
- Add SQLite migration integration tests (G-002)
- Add HTTP API smoke integration tests (G-005)
- Update CI workflow to call pytest for SQLite migrations (G-004)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
huanghuoguoguo
2026-05-08 22:41:48 +08:00
parent 3780a68dfa
commit 59871c3118
11 changed files with 1540 additions and 658 deletions
+365 -357
View File
@@ -1,428 +1,436 @@
"""
Unit tests for ChatMessageHandler behavior patterns.
Unit tests for ChatMessageHandler - REAL imports.
Tests cover chat processing patterns:
- Event emission for normal messages
- Provider invocation pattern
- Streaming response handling
- Error handling
Uses pattern-based testing to avoid circular import issues.
Tests the actual ChatMessageHandler class from production code.
Uses tests.utils.import_isolation to break circular import chain safely.
"""
from __future__ import annotations
import pytest
from unittest.mock import Mock, AsyncMock
import uuid
from unittest.mock import AsyncMock, Mock
from tests.factories import text_query
from tests.factories import FakeApp
class TestNormalMessageEventPattern:
"""Tests for normal message event emission."""
# ============== FIXTURE USING IMPORT ISOLATION UTILITY ==============
def test_person_event_type(self):
"""Person messages use PersonNormalMessageReceived."""
import langbot_plugin.api.entities.events as events
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
@pytest.fixture(scope='module')
def mock_circular_import_chain():
"""
Break circular import chain using isolated_sys_modules.
launcher_type = LauncherTypes.PERSON
Chain: handler → core.app → pipeline.controller → http_controller → groups/plugins → taskmgr
event_class = (
events.PersonNormalMessageReceived
if launcher_type == LauncherTypes.PERSON
else events.GroupNormalMessageReceived
)
Uses tests.utils.import_isolation for safe, reversible sys.modules manipulation.
"""
from tests.utils.import_isolation import (
isolated_sys_modules,
make_pipeline_handler_import_mocks,
get_handler_modules_to_clear,
)
from langbot_plugin.api.entities.builtin.provider.message import Message
assert event_class == events.PersonNormalMessageReceived
mocks = make_pipeline_handler_import_mocks()
def test_group_event_type(self):
"""Group messages use GroupNormalMessageReceived."""
import langbot_plugin.api.entities.events as events
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
# Create a default runner that yields a simple response
class DefaultRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
yield Message(role='assistant', content='fake response')
launcher_type = LauncherTypes.GROUP
mocks['langbot.pkg.provider.runner'].preregistered_runners = [DefaultRunner]
event_class = (
events.PersonNormalMessageReceived
if launcher_type == LauncherTypes.PERSON
else events.GroupNormalMessageReceived
)
clear = get_handler_modules_to_clear('chat')
assert event_class == events.GroupNormalMessageReceived
def test_event_fields_pattern(self):
"""Normal message event has expected fields."""
launcher_type = 'person'
launcher_id = '12345'
sender_id = '12345'
text_message = 'hello world'
event_data = {
'launcher_type': launcher_type,
'launcher_id': launcher_id,
'sender_id': sender_id,
'text_message': text_message,
}
assert event_data['text_message'] == 'hello world'
with isolated_sys_modules(mocks=mocks, clear=clear):
yield
class TestPreventDefaultHandling:
"""Tests for prevent_default handling in chat."""
@pytest.fixture
def fake_app():
"""Create FakeApp instance."""
return FakeApp()
@pytest.fixture
def mock_event_ctx():
"""Create mock event context."""
ctx = Mock()
ctx.is_prevented_default = Mock(return_value=False)
ctx.event = Mock()
ctx.event.user_message_alter = None
ctx.event.reply_message_chain = None
return ctx
@pytest.fixture
def set_runner():
"""Factory fixture to set a custom runner for tests."""
def _set_runner(runner_class):
import sys
sys.modules['langbot.pkg.provider.runner'].preregistered_runners = [runner_class]
return _set_runner
# ============== CACHED LAZY IMPORTS ==============
_chat_handler_module = None
_entities_module = None
def get_chat_handler():
"""Import ChatMessageHandler after circular import chain is mocked."""
global _chat_handler_module
if _chat_handler_module is None:
from importlib import import_module
_chat_handler_module = import_module('langbot.pkg.pipeline.process.handlers.chat')
return _chat_handler_module
def get_entities():
"""Import pipeline entities - uses real module."""
global _entities_module
if _entities_module is None:
from importlib import import_module
_entities_module = import_module('langbot.pkg.pipeline.entities')
return _entities_module
# ============== REAL ChatMessageHandler Tests ==============
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestChatMessageHandlerReal:
"""Tests for real ChatMessageHandler class."""
@pytest.mark.asyncio
async def test_prevent_default_interrupts(self):
"""prevent_default without reply interrupts pipeline."""
async def test_real_import_works(self):
"""Verify we can import the real handler class."""
chat = get_chat_handler()
assert hasattr(chat, 'ChatMessageHandler')
handler_cls = chat.ChatMessageHandler
assert handler_cls.__name__ == 'ChatMessageHandler'
# Simulate event context
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = True
event_ctx.event = Mock()
event_ctx.event.reply_message_chain = None
@pytest.mark.asyncio
async def test_handler_creation(self, fake_app):
"""ChatMessageHandler can be instantiated."""
chat = get_chat_handler()
handler = chat.ChatMessageHandler(fake_app)
assert handler.ap is fake_app
@pytest.mark.asyncio
async def test_prevent_default_without_reply_interrupts(self, fake_app, mock_event_ctx):
"""prevent_default without reply chain yields INTERRUPT."""
from tests.factories import text_query
chat = get_chat_handler()
entities = get_entities()
mock_event_ctx.is_prevented_default.return_value = True
mock_event_ctx.event.reply_message_chain = None
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
handler = chat.ChatMessageHandler(fake_app)
query = text_query('hello')
query.resp_messages = []
should_interrupt = False
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is None:
should_interrupt = True
assert should_interrupt is True
@pytest.mark.asyncio
async def test_prevent_default_with_reply_continues(self):
"""prevent_default with reply continues with that reply."""
from tests.factories.message import text_chain
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = True
event_ctx.event = Mock()
event_ctx.event.reply_message_chain = text_chain('plugin reply')
query = text_query('hello')
query.resp_messages = []
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is not None:
query.resp_messages.append(event_ctx.event.reply_message_chain)
assert len(query.resp_messages) == 1
class TestUserMessageAlteration:
"""Tests for user_message alteration pattern."""
@pytest.mark.asyncio
async def test_string_alters_message(self):
"""User message can be altered to string."""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = False
event_ctx.event = Mock()
event_ctx.event.user_message_alter = 'altered text'
query = text_query('original')
query.user_message = provider_message.Message(role='user', content=[])
# Pattern from handler
if event_ctx.event.user_message_alter is not None:
if isinstance(event_ctx.event.user_message_alter, str):
query.user_message.content = [
provider_message.ContentElement.from_text(event_ctx.event.user_message_alter)
]
assert query.user_message.content[0].text == 'altered text'
@pytest.mark.asyncio
async def test_list_alters_message(self):
"""User message can be altered to list."""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
altered_list = [
provider_message.ContentElement.from_text('part1'),
provider_message.ContentElement.from_text('part2'),
]
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = False
event_ctx.event = Mock()
event_ctx.event.user_message_alter = altered_list
query = text_query('original')
query.user_message = provider_message.Message(role='user', content=[])
if isinstance(event_ctx.event.user_message_alter, list):
query.user_message.content = event_ctx.event.user_message_alter
assert len(query.user_message.content) == 2
class TestRunnerSelection:
"""Tests for runner selection pattern."""
def test_runner_by_name(self):
"""Runner is selected by name from config."""
runner_name = 'local-agent'
# Simulate preregistered runners lookup - Mock with name attribute
r1 = Mock()
r1.name = 'local-agent'
r2 = Mock()
r2.name = 'dify'
r3 = Mock()
r3.name = 'n8n'
preregistered_runners = [r1, r2, r3]
runner = None
for r in preregistered_runners:
if r.name == runner_name:
runner = r
break
assert runner is not None
assert runner.name == 'local-agent'
def test_unknown_runner_raises(self):
"""Unknown runner name raises error."""
runner_name = 'unknown-runner'
preregistered_runners = [
Mock(name='local-agent'),
Mock(name='dify'),
]
runner = None
for r in preregistered_runners:
if r.name == runner_name:
runner = r
break
if runner is None:
error_raised = True
assert error_raised is True
class TestStreamingResponse:
"""Tests for streaming response pattern."""
@pytest.mark.asyncio
async def test_streaming_chunks_pattern(self):
"""Streaming produces multiple chunks."""
chunks = ['Hello', ' World', '!']
results = []
async for result in handler.handle(query):
results.append(result)
# Simulate streaming generator
async def stream_gen():
for chunk in chunks:
results.append(chunk)
await stream_gen()
assert len(results) == 3
assert ''.join(results) == 'Hello World!'
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
@pytest.mark.asyncio
async def test_streaming_resp_message_id(self):
"""Streaming uses uuid for resp_message_id."""
resp_message_id = str(uuid.uuid4())
async def test_prevent_default_with_reply_continues(self, fake_app, mock_event_ctx):
"""prevent_default with reply yields CONTINUE and updates resp_messages."""
from tests.factories import text_query, text_chain
assert len(resp_message_id) == 36 # UUID format
chat = get_chat_handler()
entities = get_entities()
@pytest.mark.asyncio
async def test_streaming_pop_previous(self):
"""Streaming pops previous response before adding new."""
query = text_query('test')
query.resp_messages = [Mock()] # Previous chunk
query.resp_message_chain = [Mock()]
reply_chain = text_chain('plugin reply')
mock_event_ctx.is_prevented_default.return_value = True
mock_event_ctx.event.reply_message_chain = reply_chain
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Pattern from handler: pop before adding new chunk
if query.resp_messages:
query.resp_messages.pop()
if query.resp_message_chain:
query.resp_message_chain.pop()
query.resp_messages.append(Mock()) # New chunk
assert len(query.resp_messages) == 1 # Only new chunk
class TestNonStreamingResponse:
"""Tests for non-streaming response pattern."""
@pytest.mark.asyncio
async def test_single_response_pattern(self):
"""Non-streaming produces single response."""
query = text_query('test')
handler = chat.ChatMessageHandler(fake_app)
query = text_query('hello')
query.resp_messages = []
# Simulate non-streaming runner
async def run():
yield Mock(readable_str=lambda: 'response text')
async for result in run():
query.resp_messages.append(result)
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.CONTINUE
assert len(query.resp_messages) == 1
class TestExceptionHandling:
"""Tests for exception handling pattern."""
assert query.resp_messages[0] == reply_chain
@pytest.mark.asyncio
async def test_exception_interrupts(self):
"""Exception produces INTERRUPT result."""
async def test_user_message_alter_string(self, fake_app, mock_event_ctx, set_runner):
"""user_message_alter as string updates query.user_message."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message
text_query('test')
pipeline_config = {
'output': {
'misc': {
'exception-handling': 'show-hint',
'failure-hint': 'Request failed.',
}
}
}
chat = get_chat_handler()
# Simulate exception
exception = ValueError('provider error')
mock_event_ctx.is_prevented_default.return_value = False
mock_event_ctx.event.user_message_alter = 'altered text'
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
exception_handling = pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
query = text_query('original')
query.adapter = Mock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=False)
query.user_message = Message(role='user', content=[])
if exception_handling == 'show-error':
user_notice = f'{exception}'
elif exception_handling == 'show-hint':
user_notice = pipeline_config['output']['misc'].get('failure-hint', 'Request failed.')
else: # hide
user_notice = None
class QuickRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
yield Message(role='assistant', content='ok')
assert user_notice == 'Request failed.'
set_runner(QuickRunner)
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert query.user_message.content is not None
@pytest.mark.asyncio
async def test_exception_show_error(self):
"""show-error mode shows actual error."""
text_query('test')
pipeline_config = {
'output': {
'misc': {
'exception-handling': 'show-error',
}
}
}
async def test_adapter_without_stream_method_defaults_non_stream(self, fake_app, mock_event_ctx, set_runner):
"""Adapter without is_stream_output_supported defaults to non-stream."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message, ContentElement
exception = ValueError('API timeout')
chat = get_chat_handler()
exception_handling = pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
mock_event_ctx.is_prevented_default.return_value = False
mock_event_ctx.event.user_message_alter = None
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
if exception_handling == 'show-error':
user_notice = f'{exception}'
else:
user_notice = 'Request failed.'
assert user_notice == 'API timeout'
@pytest.mark.asyncio
async def test_exception_hide(self):
"""hide mode shows no user notice."""
text_query('test')
pipeline_config = {
'output': {
'misc': {
'exception-handling': 'hide',
}
}
}
ValueError('hidden error')
exception_handling = pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
if exception_handling == 'hide':
user_notice = None
else:
user_notice = 'Error'
assert user_notice is None
class TestMessageHistoryUpdate:
"""Tests for conversation message history."""
@pytest.mark.asyncio
async def test_messages_appended_to_conversation(self):
"""User message and response appended to conversation."""
query = text_query('test')
query.session = Mock()
query.session.using_conversation = Mock()
query.session.using_conversation.messages = []
query.adapter = Mock(spec=[])
query.user_message = Message(role='user', content=[ContentElement.from_text('test')])
query.user_message = Mock()
query.resp_messages = [Mock(), Mock()]
class SingleRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
yield Message(role='assistant', content='response')
# Pattern from handler after successful response
query.session.using_conversation.messages.append(query.user_message)
query.session.using_conversation.messages.extend(query.resp_messages)
set_runner(SingleRunner)
assert len(query.session.using_conversation.messages) == 3
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) >= 1
class TestStreamOutputCheck:
"""Tests for stream output support check."""
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestChatHandlerStreaming:
"""Tests for streaming behavior."""
@pytest.mark.asyncio
async def test_adapter_stream_check(self):
"""Adapter is checked for stream support."""
adapter = AsyncMock()
adapter.is_stream_output_supported = AsyncMock(return_value=True)
async def test_streaming_chunks_collected(self, fake_app, mock_event_ctx, set_runner):
"""Streaming produces multiple results."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message, ContentElement, MessageChunk
is_stream = await adapter.is_stream_output_supported()
chat = get_chat_handler()
assert is_stream is True
mock_event_ctx.is_prevented_default.return_value = False
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
query = text_query('stream test')
query.adapter = Mock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=True)
query.adapter.create_message_card = AsyncMock()
query.user_message = Message(role='user', content=[ContentElement.from_text('test')])
class StreamRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
yield MessageChunk(role='assistant', content='Hello', is_final=False)
yield MessageChunk(role='assistant', content=' World', is_final=True)
set_runner(StreamRunner)
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) >= 1
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestChatHandlerExceptions:
"""Tests for exception handling."""
@pytest.mark.asyncio
async def test_adapter_no_stream_method(self):
"""Adapter without method defaults to False."""
adapter = Mock(spec=[]) # Empty spec, no methods
# No is_stream_output_supported method
async def test_runner_exception_yields_interrupt(self, fake_app, mock_event_ctx, set_runner):
"""Runner exception yields INTERRUPT with error notices."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message
is_stream = False
try:
if hasattr(adapter, 'is_stream_output_supported'):
is_stream = await adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
chat = get_chat_handler()
entities = get_entities()
assert is_stream is False
mock_event_ctx.is_prevented_default.return_value = False
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
query = text_query('fail test')
query.adapter = Mock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=False)
query.user_message = Message(role='user', content=[])
class TestTelemetryPattern:
"""Tests for telemetry reporting pattern."""
def test_telemetry_payload_fields(self):
"""Telemetry payload has expected fields."""
query_id = 123
adapter_name = 'TestAdapter'
runner_name = 'local-agent'
duration_ms = 150
payload = {
'query_id': query_id,
'adapter': adapter_name,
'runner': runner_name,
'duration_ms': duration_ms,
query.pipeline_config = {
'output': {'misc': {'exception-handling': 'show-hint', 'failure-hint': 'Request failed.'}},
'ai': {'runner': {'runner': 'local-agent'}, 'local-agent': {'prompt': 'default', 'model': {'primary': 'test'}}},
}
assert payload['query_id'] == 123
assert payload['duration_ms'] == 150
class FailingRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
raise ValueError('API error')
yield
def test_telemetry_error_included(self):
"""Telemetry includes error info on failure."""
error_info = 'Traceback...'
set_runner(FailingRunner)
payload = {
'error': error_info,
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert results[0].user_notice == 'Request failed.'
assert results[0].error_notice is not None
@pytest.mark.asyncio
async def test_exception_show_error_mode(self, fake_app, mock_event_ctx, set_runner):
"""show-error mode shows actual exception."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message
chat = get_chat_handler()
mock_event_ctx.is_prevented_default.return_value = False
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
query = text_query('error test')
query.adapter = Mock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=False)
query.user_message = Message(role='user', content=[])
query.pipeline_config = {
'output': {'misc': {'exception-handling': 'show-error'}},
'ai': {'runner': {'runner': 'local-agent'}, 'local-agent': {'prompt': 'default', 'model': {'primary': 'test'}}},
}
assert payload['error'] == 'Traceback...'
class ErrorRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
raise ValueError('Custom error')
yield
set_runner(ErrorRunner)
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert results[0].user_notice == 'Custom error'
@pytest.mark.asyncio
async def test_exception_hide_mode(self, fake_app, mock_event_ctx, set_runner):
"""hide mode shows no user notice."""
from tests.factories import text_query
from langbot_plugin.api.entities.builtin.provider.message import Message
chat = get_chat_handler()
mock_event_ctx.is_prevented_default.return_value = False
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
query = text_query('hide test')
query.adapter = Mock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=False)
query.user_message = Message(role='user', content=[])
query.pipeline_config = {
'output': {'misc': {'exception-handling': 'hide'}},
'ai': {'runner': {'runner': 'local-agent'}, 'local-agent': {'prompt': 'default', 'model': {'primary': 'test'}}},
}
class HideErrorRunner:
name = 'local-agent'
def __init__(self, app, config):
self.app = app
self.config = config
async def run(self, query):
raise RuntimeError('hidden')
yield
set_runner(HideErrorRunner)
handler = chat.ChatMessageHandler(fake_app)
results = []
async for result in handler.handle(query):
results.append(result)
assert results[0].user_notice is None
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestChatHandlerHelper:
"""Tests for helper methods."""
def test_cut_str_short(self, fake_app):
"""cut_str returns short string unchanged."""
chat = get_chat_handler()
handler = chat.ChatMessageHandler(fake_app)
result = handler.cut_str('short text')
assert result == 'short text'
def test_cut_str_long(self, fake_app):
"""cut_str truncates long string."""
chat = get_chat_handler()
handler = chat.ChatMessageHandler(fake_app)
result = handler.cut_str('this is a very long string that exceeds twenty characters')
assert '...' in result
assert len(result) <= 23
def test_cut_str_multiline(self, fake_app):
"""cut_str truncates multiline string."""
chat = get_chat_handler()
handler = chat.ChatMessageHandler(fake_app)
result = handler.cut_str('first line\nsecond line')
assert '...' in result
+340 -252
View File
@@ -1,308 +1,396 @@
"""
Unit tests for CommandHandler behavior patterns.
Unit tests for CommandHandler - REAL imports.
Tests cover command processing patterns:
- Command parsing and routing
- Event emission pattern
- Command manager interaction
- Privilege handling
Uses pattern-based testing to avoid circular import issues in source code.
Tests the actual CommandHandler class from production code.
Uses tests.utils.import_isolation to break circular import chain safely.
"""
from __future__ import annotations
import pytest
from unittest.mock import Mock
from unittest.mock import AsyncMock, Mock
from tests.factories import command_query
from tests.factories import FakeApp, command_query
class TestCommandParsingPattern:
"""Tests for command parsing logic."""
# ============== FIXTURE USING IMPORT ISOLATION UTILITY ==============
def test_command_text_extraction(self):
@pytest.fixture(scope='module')
def mock_circular_import_chain():
"""
Break circular import chain using isolated_sys_modules.
Chain: handler → core.app → pipeline.controller → http_controller → groups/plugins → taskmgr
Uses tests.utils.import_isolation for safe, reversible sys.modules manipulation.
"""
from tests.utils.import_isolation import (
isolated_sys_modules,
make_pipeline_handler_import_mocks,
get_handler_modules_to_clear,
)
mocks = make_pipeline_handler_import_mocks()
clear = get_handler_modules_to_clear('command')
with isolated_sys_modules(mocks=mocks, clear=clear):
yield
@pytest.fixture
def fake_app():
"""Create FakeApp instance."""
return FakeApp()
@pytest.fixture
def mock_event_ctx():
"""Create mock event context."""
ctx = Mock()
ctx.is_prevented_default = Mock(return_value=False)
ctx.event = Mock()
ctx.event.reply_message_chain = None
return ctx
@pytest.fixture
def mock_execute_factory():
"""Factory fixture to create mock cmd_mgr.execute generators."""
def _create_execute(
text: str | None = 'ok',
error: str | None = None,
image_url: str | None = None,
image_base64: str | None = None,
file_url: str | None = None,
):
async def mock_execute(command_text, full_command_text, query, session):
ret = Mock()
ret.text = text
ret.error = error
ret.image_url = image_url
ret.image_base64 = image_base64
ret.file_url = file_url
yield ret
return mock_execute
return _create_execute
# ============== CACHED LAZY IMPORTS ==============
_command_handler_module = None
_entities_module = None
def get_command_handler():
"""Import CommandHandler after circular import chain is mocked."""
global _command_handler_module
if _command_handler_module is None:
from importlib import import_module
_command_handler_module = import_module('langbot.pkg.pipeline.process.handlers.command')
return _command_handler_module
def get_entities():
"""Import pipeline entities - uses real module."""
global _entities_module
if _entities_module is None:
from importlib import import_module
_entities_module = import_module('langbot.pkg.pipeline.entities')
return _entities_module
# ============== REAL CommandHandler Tests ==============
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestCommandHandlerReal:
"""Tests for real CommandHandler class."""
@pytest.mark.asyncio
async def test_real_import_works(self):
"""Verify we can import the real handler class."""
command = get_command_handler()
assert hasattr(command, 'CommandHandler')
handler_cls = command.CommandHandler
assert handler_cls.__name__ == 'CommandHandler'
@pytest.mark.asyncio
async def test_handler_creation(self, fake_app):
"""CommandHandler can be instantiated."""
command = get_command_handler()
handler = command.CommandHandler(fake_app)
assert handler.ap is fake_app
@pytest.mark.asyncio
async def test_command_parsing_extracts_command_name(self, fake_app, mock_event_ctx):
"""Command text is extracted after prefix."""
# Simulate the parsing pattern from command handler
full_command_text = "/help arg1 arg2"
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
# Handler strips first character (prefix)
command_text = full_command_text.strip()[1:]
parts = command_text.split(' ')
executed_commands = []
async def track_execute(command_text, full_command_text, query, session):
executed_commands.append(command_text)
ret = Mock()
ret.text = 'ok'
ret.error = None
ret.image_url = None
ret.image_base64 = None
ret.file_url = None
yield ret
assert parts[0] == 'help'
assert parts[1:] == ['arg1', 'arg2']
fake_app.cmd_mgr.execute = track_execute
def test_empty_command_parts(self):
"""Empty command has no parts."""
full_command_text = "/"
handler = command.CommandHandler(fake_app)
query = command_query('help arg1 arg2')
command_text = full_command_text.strip()[1:]
parts = command_text.split(' ')
results = []
async for result in handler.handle(query):
results.append(result)
assert parts == ['']
assert executed_commands[0] == 'help arg1 arg2'
def test_single_command_no_args(self):
"""Single command has no arguments."""
full_command_text = "/status"
command_text = full_command_text.strip()[1:]
parts = command_text.split(' ')
assert parts == ['status']
class TestCommandEventCreation:
"""Tests for command event creation pattern."""
def test_event_type_by_launcher_type(self):
"""Event type differs for person/group."""
import langbot_plugin.api.entities.events as events
# Person command
person_event_class = events.PersonCommandSent
# Group command
group_event_class = events.GroupCommandSent
assert person_event_class is not None
assert group_event_class is not None
def test_event_fields_pattern(self):
"""Command event should have expected fields."""
@pytest.mark.asyncio
async def test_admin_privilege_check(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Admin users get privilege level 2."""
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
launcher_type = LauncherTypes.PERSON.value
launcher_id = '12345'
sender_id = '12345'
command = 'help'
params = ['arg1', 'arg2']
is_admin = False
command = get_command_handler()
# Simulate event creation pattern
event_data = {
'launcher_type': launcher_type,
'launcher_id': launcher_id,
'sender_id': sender_id,
'command': command,
'params': params,
'is_admin': is_admin,
}
fake_app.instance_config.data = {'admins': ['person_12345']}
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory()
assert event_data['command'] == 'help'
assert event_data['params'] == ['arg1', 'arg2']
handler = command.CommandHandler(fake_app)
query = command_query('status')
query.launcher_type = LauncherTypes.PERSON
query.launcher_id = 12345
results = []
async for result in handler.handle(query):
results.append(result)
class TestPrivilegeCheckPattern:
"""Tests for privilege/admin check."""
def test_admin_check_by_session_id(self):
"""Admin is checked by session_id format."""
admins = ['person_12345', 'group_99999']
launcher_type = 'person'
launcher_id = '12345'
session_id = f'{launcher_type}_{launcher_id}'
is_admin = session_id in admins
assert is_admin is True
def test_non_admin_check(self):
"""Non-admin user has privilege 1."""
admins = ['person_12345']
launcher_type = 'person'
launcher_id = '67890'
session_id = f'{launcher_type}_{launcher_id}'
is_admin = session_id in admins
assert is_admin is False
def test_privilege_levels(self):
"""Privilege level 1 for normal, 2 for admin."""
normal_privilege = 1
admin_privilege = 2
admins = ['person_12345']
# Normal user
session_id = 'person_67890'
privilege = 2 if session_id in admins else 1
assert privilege == normal_privilege
# Admin user
session_id = 'person_12345'
privilege = 2 if session_id in admins else 1
assert privilege == admin_privilege
class TestCommandResultHandling:
"""Tests for command result handling patterns."""
call_args = fake_app.plugin_connector.emit_event.call_args
event = call_args[0][0]
assert event.is_admin is True
@pytest.mark.asyncio
async def test_text_result_pattern(self):
"""Text result is converted to message."""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
async def test_non_admin_privilege_check(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Non-admin users get privilege level 1."""
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
# Simulate command return
ret = Mock()
ret.text = 'Command output'
ret.error = None
ret.image_url = None
ret.image_base64 = None
ret.file_url = None
command = get_command_handler()
# Pattern from handler: build content list
content = []
if ret.text is not None:
content.append(provider_message.ContentElement.from_text(ret.text))
fake_app.instance_config.data = {'admins': ['person_12345']}
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory()
assert len(content) == 1
assert content[0].type == 'text'
assert content[0].text == 'Command output'
handler = command.CommandHandler(fake_app)
query = command_query('status')
query.launcher_type = LauncherTypes.PERSON
query.launcher_id = 67890
results = []
async for result in handler.handle(query):
results.append(result)
call_args = fake_app.plugin_connector.emit_event.call_args
event = call_args[0][0]
assert event.is_admin is False
@pytest.mark.asyncio
async def test_error_result_pattern(self):
async def test_prevent_default_with_reply_continues(self, fake_app, mock_event_ctx):
"""prevent_default with reply yields CONTINUE."""
from tests.factories.message import text_chain
command = get_command_handler()
entities = get_entities()
reply_chain = text_chain('plugin reply')
mock_event_ctx.is_prevented_default.return_value = True
mock_event_ctx.event.reply_message_chain = reply_chain
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
handler = command.CommandHandler(fake_app)
query = command_query('test')
query.resp_messages = []
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.CONTINUE
assert len(query.resp_messages) == 1
assert query.resp_messages[0] == reply_chain
@pytest.mark.asyncio
async def test_prevent_default_without_reply_interrupts(self, fake_app, mock_event_ctx):
"""prevent_default without reply yields INTERRUPT."""
command = get_command_handler()
entities = get_entities()
mock_event_ctx.is_prevented_default.return_value = True
mock_event_ctx.event.reply_message_chain = None
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
handler = command.CommandHandler(fake_app)
query = command_query('test')
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
@pytest.mark.asyncio
async def test_event_type_person_command(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Person launcher creates PersonCommandSent event."""
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
from langbot_plugin.api.entities import events
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory()
handler = command.CommandHandler(fake_app)
query = command_query('help')
query.launcher_type = LauncherTypes.PERSON
results = []
async for result in handler.handle(query):
results.append(result)
call_args = fake_app.plugin_connector.emit_event.call_args
event = call_args[0][0]
assert isinstance(event, events.PersonCommandSent)
@pytest.mark.asyncio
async def test_event_type_group_command(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Group launcher creates GroupCommandSent event."""
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
from langbot_plugin.api.entities import events
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory()
handler = command.CommandHandler(fake_app)
query = command_query('help')
query.launcher_type = LauncherTypes.GROUP
results = []
async for result in handler.handle(query):
results.append(result)
call_args = fake_app.plugin_connector.emit_event.call_args
event = call_args[0][0]
assert isinstance(event, events.GroupCommandSent)
@pytest.mark.asyncio
async def test_command_result_text(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Text result is added to resp_messages."""
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory(text='Command output')
handler = command.CommandHandler(fake_app)
query = command_query('echo')
query.resp_messages = []
results = []
async for result in handler.handle(query):
results.append(result)
assert len(query.resp_messages) == 1
msg = query.resp_messages[0]
assert msg.role == 'command'
assert len(msg.content) == 1
assert msg.content[0].type == 'text'
assert msg.content[0].text == 'Command output'
@pytest.mark.asyncio
async def test_command_result_error(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Error result creates error message."""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory(text=None, error='Command failed')
ret = Mock()
ret.text = None
ret.error = 'Command failed'
handler = command.CommandHandler(fake_app)
query = command_query('fail')
query.resp_messages = []
# Error handling pattern
if ret.error is not None:
msg = provider_message.Message(
role='command',
content=str(ret.error),
)
results = []
async for result in handler.handle(query):
results.append(result)
assert len(query.resp_messages) == 1
msg = query.resp_messages[0]
assert msg.role == 'command'
assert msg.content == 'Command failed'
@pytest.mark.asyncio
async def test_image_result_pattern(self):
"""Image result is added to content."""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
async def test_command_result_image_url(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Image URL result is added to content."""
command = get_command_handler()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory(
text='Here is the image:',
image_url='https://example.com/image.png'
)
ret = Mock()
ret.text = 'Here is the image:'
ret.error = None
ret.image_url = 'https://example.com/image.png'
ret.image_base64 = None
ret.file_url = None
content = []
if ret.text is not None:
content.append(provider_message.ContentElement.from_text(ret.text))
if ret.image_url is not None:
content.append(provider_message.ContentElement.from_image_url(ret.image_url))
assert len(content) == 2
assert content[0].type == 'text'
assert content[1].type == 'image_url'
class TestPreventDefaultHandling:
"""Tests for prevent_default handling."""
@pytest.mark.asyncio
async def test_prevent_default_with_reply(self):
"""prevent_default with reply continues pipeline."""
from tests.factories.message import text_chain
# Simulate event context
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = True
event_ctx.event = Mock()
event_ctx.event.reply_message_chain = text_chain('plugin reply')
query = command_query('test')
handler = command.CommandHandler(fake_app)
query = command_query('image')
query.resp_messages = []
# Pattern from handler
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is not None:
query.resp_messages.append(event_ctx.event.reply_message_chain)
# yield CONTINUE
else:
# yield INTERRUPT
pass
results = []
async for result in handler.handle(query):
results.append(result)
assert len(query.resp_messages) == 1
msg = query.resp_messages[0]
assert len(msg.content) == 2
assert msg.content[0].type == 'text'
assert msg.content[1].type == 'image_url'
@pytest.mark.asyncio
async def test_prevent_default_without_reply(self):
"""prevent_default without reply interrupts."""
event_ctx = Mock()
event_ctx.is_prevented_default.return_value = True
event_ctx.event = Mock()
event_ctx.event.reply_message_chain = None
async def test_command_result_empty_interrupts(self, fake_app, mock_event_ctx, mock_execute_factory):
"""Empty result yields INTERRUPT."""
command = get_command_handler()
entities = get_entities()
fake_app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
fake_app.cmd_mgr.execute = mock_execute_factory(text=None)
query = command_query('test')
query.resp_messages = []
handler = command.CommandHandler(fake_app)
query = command_query('empty')
should_interrupt = False
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is None:
should_interrupt = True
results = []
async for result in handler.handle(query):
results.append(result)
assert should_interrupt is True
assert results[0].result_type == entities.ResultType.INTERRUPT
class TestStringTruncationHelper:
"""Tests for cut_str helper method."""
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestCommandHandlerHelper:
"""Tests for helper methods."""
def test_short_string_no_change(self):
"""Short string is not truncated."""
# Pattern from handler.cut_str
def cut_str(s: str) -> str:
s0 = s.split('\n')[0]
if len(s0) > 20 or '\n' in s:
s0 = s0[:20] + '...'
return s0
result = cut_str('short text')
def test_cut_str_short(self, fake_app):
"""cut_str returns short string unchanged."""
command = get_command_handler()
handler = command.CommandHandler(fake_app)
result = handler.cut_str('short text')
assert result == 'short text'
def test_long_string_truncated(self):
"""Long string is truncated."""
def cut_str(s: str) -> str:
s0 = s.split('\n')[0]
if len(s0) > 20 or '\n' in s:
s0 = s0[:20] + '...'
return s0
result = cut_str('this is a very long string that exceeds twenty characters')
def test_cut_str_long(self, fake_app):
"""cut_str truncates long string."""
command = get_command_handler()
handler = command.CommandHandler(fake_app)
result = handler.cut_str('this is a very long string that exceeds twenty characters')
assert '...' in result
assert len(result) <= 23
def test_multiline_truncated(self):
"""Multiline string is truncated."""
def cut_str(s: str) -> str:
s0 = s.split('\n')[0]
if len(s0) > 20 or '\n' in s:
s0 = s0[:20] + '...'
return s0
result = cut_str('first line\nsecond line\nthird')
assert '...' in result
class TestCommandPrefixConfiguration:
"""Tests for command prefix configuration."""
def test_default_prefixes(self):
"""Default prefixes are slash and exclamation."""
default_prefixes = ['/', '!']
assert '/' in default_prefixes
assert '!' in default_prefixes
def test_custom_prefix(self):
"""Custom prefix can be configured."""
custom_prefix = '#'
full_text = f'{custom_prefix}help'
# Would be checked against config['command']['prefix']
is_command = full_text.startswith(custom_prefix)
assert is_command is True
def test_cut_str_multiline(self, fake_app):
"""cut_str truncates multiline string."""
command = get_command_handler()
handler = command.CommandHandler(fake_app)
result = handler.cut_str('first line\nsecond line')
assert '...' in result