feat(plugin): report deferred response delivery failures

This commit is contained in:
BiFangKNT
2026-06-26 10:40:00 +08:00
parent 5b2826fa49
commit ff71c1cede
11 changed files with 770 additions and 14 deletions
@@ -662,6 +662,100 @@ class TestSendResponseBackStage:
assert len(outbound) == 1
assert outbound[0]['type'] == 'reply'
@pytest.mark.asyncio
async def test_send_response_failure_notifies_plugin_diagnostic(self, pipeline_app):
"""Plugin-provided deferred replies should report delivery failures."""
from langbot.pkg.pipeline import plugin_diagnostics
from langbot.pkg.pipeline.respback import respback
from tests.factories.message import text_chain
from langbot_plugin.api.entities.builtin.provider.message import Message
query = text_query('hello')
query.adapter.reply_message.side_effect = RuntimeError('send failed')
query.pipeline_config = create_minimal_pipeline_config()
query.current_stage_name = 'SendResponseBackStage'
query.resp_messages = [Message(role='assistant', content='test response')]
query.resp_message_chain = [text_chain('test response')]
plugin_diagnostics.record_plugin_response_source(
query,
0,
[
{
'kind': 'reply_message_chain',
'plugin': {'author': 'tester', 'name': 'demo'},
}
],
[{'manifest': {'metadata': {'author': 'observer', 'name': 'not-reply-source'}}}],
'NormalMessageResponded',
)
pipeline_app.plugin_connector.notify_plugin_diagnostic = AsyncMock()
respback_stage = respback.SendResponseBackStage(pipeline_app)
with pytest.raises(RuntimeError, match='send failed'):
await respback_stage.process(query, 'SendResponseBackStage')
pipeline_app.plugin_connector.notify_plugin_diagnostic.assert_awaited_once()
payload = pipeline_app.plugin_connector.notify_plugin_diagnostic.await_args.args[0]
assert payload['code'] == 'response_delivery_failed'
assert payload['plugin'] == {'author': 'tester', 'name': 'demo'}
assert payload['query']['event_name'] == 'NormalMessageResponded'
assert payload['delivery']['error_type'] == 'RuntimeError'
assert 'attribution_warning' not in payload['details']
@pytest.mark.asyncio
async def test_send_response_failure_warns_for_old_runtime_attribution(self, pipeline_app):
"""Older plugin runtimes without response_sources should get approximate diagnostics."""
from langbot.pkg.pipeline import plugin_diagnostics
from langbot.pkg.pipeline.respback import respback
from tests.factories.message import text_chain
from langbot_plugin.api.entities.builtin.provider.message import Message
query = text_query('hello')
query.adapter.reply_message.side_effect = RuntimeError('send failed')
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = [Message(role='assistant', content='test response')]
query.resp_message_chain = [text_chain('test response')]
plugin_diagnostics.record_plugin_response_source(
query,
0,
None,
[{'manifest': {'metadata': {'author': 'tester', 'name': 'demo'}}}],
'NormalMessageResponded',
)
pipeline_app.plugin_connector.notify_plugin_diagnostic = AsyncMock()
respback_stage = respback.SendResponseBackStage(pipeline_app)
with pytest.raises(RuntimeError, match='send failed'):
await respback_stage.process(query, 'SendResponseBackStage')
payload = pipeline_app.plugin_connector.notify_plugin_diagnostic.await_args.args[0]
assert payload['plugin'] == {'author': 'tester', 'name': 'demo'}
assert 'attribution_warning' in payload['details']
@pytest.mark.asyncio
async def test_send_response_failure_ignores_query_variable_spoofing(self, pipeline_app):
"""Plugin-controlled query variables must not mask delivery failures."""
from langbot.pkg.pipeline.respback import respback
from tests.factories.message import text_chain
from langbot_plugin.api.entities.builtin.provider.message import Message
query = text_query('hello')
query.adapter.reply_message.side_effect = RuntimeError('send failed')
query.pipeline_config = create_minimal_pipeline_config()
query.resp_messages = [Message(role='assistant', content='test response')]
query.resp_message_chain = [text_chain('test response')]
query.variables['_plugin_response_sources'] = {0: ['malformed']}
pipeline_app.plugin_connector.notify_plugin_diagnostic = AsyncMock()
respback_stage = respback.SendResponseBackStage(pipeline_app)
with pytest.raises(RuntimeError, match='send failed'):
await respback_stage.process(query, 'SendResponseBackStage')
pipeline_app.plugin_connector.notify_plugin_diagnostic.assert_not_called()
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestStageChainIntegration:
+142
View File
@@ -36,6 +36,11 @@ def get_entities_module():
return import_module('langbot.pkg.pipeline.entities')
def get_plugin_diagnostics_module():
"""Lazy import for plugin diagnostic attribution helpers."""
return import_module('langbot.pkg.pipeline.plugin_diagnostics')
def make_wrapper_config():
"""Create a pipeline config for wrapper tests."""
return {
@@ -106,6 +111,45 @@ class TestResponseWrapperMessageChain:
assert results[0].result_type == entities.ResultType.CONTINUE
assert len(results[0].new_query.resp_message_chain) == 1
@pytest.mark.asyncio
async def test_message_chain_direct_append_consumes_pending_plugin_source(self):
"""MessageChain replies from earlier plugin events keep attribution."""
wrapper = get_wrapper_module()
app = FakeApp()
stage = wrapper.ResponseWrapper(app)
await stage.initialize(make_wrapper_config())
reply_chain = platform_message.MessageChain([platform_message.Plain(text='response')])
query = text_query('hello')
query.pipeline_config = make_wrapper_config()
query.resp_messages = [reply_chain]
query.resp_message_chain = []
plugin_diagnostics = get_plugin_diagnostics_module()
plugin_diagnostics.record_pending_plugin_response_source(
query,
reply_chain,
[
{
'kind': 'reply_message_chain',
'plugin': {'author': 'tester', 'name': 'demo'},
}
],
[{'manifest': {'metadata': {'author': 'observer', 'name': 'not-reply-source'}}}],
'PersonNormalMessageReceived',
)
results = []
async for result in stage.process(query, 'ResponseWrapper'):
results.append(result)
sources = plugin_diagnostics._get_response_sources(results[0].new_query, 0)
assert sources[0].plugin == {'author': 'tester', 'name': 'demo'}
assert sources[0].event_name == 'PersonNormalMessageReceived'
assert sources[0].is_approximate is False
assert '_plugin_response_sources' not in query.variables
assert '_plugin_pending_response_sources' not in query.variables
class TestResponseWrapperCommand:
"""Tests for command response wrapping."""
@@ -421,6 +465,104 @@ class TestResponseWrapperCustomReply:
chain = results[0].new_query.resp_message_chain[0]
assert 'Custom reply' in str(chain)
@pytest.mark.asyncio
async def test_custom_reply_records_plugin_source(self):
"""Plugin reply_message_chain should keep emitted plugin attribution."""
wrapper = get_wrapper_module()
app = FakeApp()
app.sess_mgr.get_session = AsyncMock(return_value=make_session())
custom_chain = platform_message.MessageChain([platform_message.Plain(text='Custom reply')])
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = custom_chain
mock_event_ctx._emitted_plugins = [
{
'manifest': {'metadata': {'author': 'observer', 'name': 'not-reply-source'}},
'plugin_config': {'token': 'secret-token'},
},
]
mock_event_ctx._response_sources = [
{
'kind': 'reply_message_chain',
'plugin': {'author': 'tester', 'name': 'demo'},
}
]
app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
stage = wrapper.ResponseWrapper(app)
pipeline_config = make_wrapper_config()
await stage.initialize(pipeline_config)
query = text_query('hello')
query.pipeline_config = pipeline_config
query.resp_message_chain = []
assistant_resp = Mock()
assistant_resp.role = 'assistant'
assistant_resp.content = 'Default reply'
assistant_resp.tool_calls = None
assistant_resp.get_content_platform_message_chain = Mock(
return_value=platform_message.MessageChain([platform_message.Plain(text='Default reply')])
)
query.resp_messages = [assistant_resp]
results = []
async for result in stage.process(query, 'ResponseWrapper'):
results.append(result)
plugin_diagnostics = get_plugin_diagnostics_module()
sources = plugin_diagnostics._get_response_sources(results[0].new_query, 0)
assert sources[0].plugin == {'author': 'tester', 'name': 'demo'}
assert sources[0].event_name == 'NormalMessageResponded'
assert sources[0].is_approximate is False
assert 'secret-token' not in str(sources)
assert '_plugin_response_sources' not in query.variables
@pytest.mark.asyncio
async def test_custom_reply_falls_back_to_emitted_plugins_for_old_runtime(self):
"""Older plugin runtimes without response_sources keep approximate attribution."""
wrapper = get_wrapper_module()
app = FakeApp()
app.sess_mgr.get_session = AsyncMock(return_value=make_session())
custom_chain = platform_message.MessageChain([platform_message.Plain(text='Custom reply')])
mock_event_ctx = Mock()
mock_event_ctx.is_prevented_default = Mock(return_value=False)
mock_event_ctx.event = Mock()
mock_event_ctx.event.reply_message_chain = custom_chain
mock_event_ctx._emitted_plugins = [
{'manifest': {'metadata': {'author': 'tester', 'name': 'demo'}}},
]
app.plugin_connector.emit_event = AsyncMock(return_value=mock_event_ctx)
stage = wrapper.ResponseWrapper(app)
pipeline_config = make_wrapper_config()
await stage.initialize(pipeline_config)
query = text_query('hello')
query.pipeline_config = pipeline_config
query.resp_message_chain = []
assistant_resp = Mock()
assistant_resp.role = 'assistant'
assistant_resp.content = 'Default reply'
assistant_resp.tool_calls = None
assistant_resp.get_content_platform_message_chain = Mock(
return_value=platform_message.MessageChain([platform_message.Plain(text='Default reply')])
)
query.resp_messages = [assistant_resp]
results = []
async for result in stage.process(query, 'ResponseWrapper'):
results.append(result)
plugin_diagnostics = get_plugin_diagnostics_module()
sources = plugin_diagnostics._get_response_sources(results[0].new_query, 0)
assert sources[0].plugin == {'author': 'tester', 'name': 'demo'}
assert sources[0].is_approximate is True
class TestResponseWrapperVariables:
"""Tests for bound plugins variable."""
@@ -13,6 +13,8 @@ import pytest
from unittest.mock import Mock, AsyncMock
from importlib import import_module
from tests.factories import text_query
def get_connector_module():
"""Lazy import to avoid circular import issues."""
@@ -132,6 +134,130 @@ class TestListPlugins:
assert result[0]['debug'] is True
class TestPluginDiagnostics:
@pytest.mark.asyncio
async def test_emit_event_preserves_response_sources(self):
connector = create_mock_connector()
query = text_query('hello')
event = query.message_event
object.__setattr__(event, 'query', query)
connector_module = get_connector_module()
original_from_event = connector_module.context.EventContext.from_event
original_model_validate = connector_module.context.EventContext.model_validate
response_sources = [
{
'kind': 'reply_message_chain',
'plugin': {'author': 'tester', 'name': 'demo'},
}
]
async def emit_event_response(event_context, include_plugins=None):
return {
'event_context': event_context,
'emitted_plugins': [],
'response_sources': response_sources,
}
connector.handler = AsyncMock()
connector.handler.emit_event = AsyncMock(side_effect=emit_event_response)
fake_event_ctx = Mock()
event_dump = event.model_dump()
event_dump['event_name'] = 'FriendMessage'
fake_event_ctx.model_dump.return_value = {
'query_id': query.query_id,
'eid': 0,
'event_name': 'FriendMessage',
'event': event_dump,
'is_prevent_default': False,
'is_prevent_postorder': False,
}
connector_module.context.EventContext.from_event = Mock(return_value=fake_event_ctx)
parsed_event_ctx = Mock()
connector_module.context.EventContext.model_validate = Mock(return_value=parsed_event_ctx)
try:
event_ctx = await connector.emit_event(event)
finally:
connector_module.context.EventContext.from_event = original_from_event
connector_module.context.EventContext.model_validate = original_model_validate
assert event_ctx is parsed_event_ctx
assert event_ctx._response_sources == response_sources
@pytest.mark.asyncio
async def test_emit_event_leaves_response_sources_absent_for_old_runtime(self):
connector = create_mock_connector()
query = text_query('hello')
event = query.message_event
object.__setattr__(event, 'query', query)
connector_module = get_connector_module()
original_from_event = connector_module.context.EventContext.from_event
original_model_validate = connector_module.context.EventContext.model_validate
async def emit_event_response(event_context, include_plugins=None):
return {
'event_context': event_context,
'emitted_plugins': [
{'manifest': {'metadata': {'author': 'tester', 'name': 'demo'}}},
],
}
connector.handler = AsyncMock()
connector.handler.emit_event = AsyncMock(side_effect=emit_event_response)
fake_event_ctx = Mock()
event_dump = event.model_dump()
event_dump['event_name'] = 'FriendMessage'
fake_event_ctx.model_dump.return_value = {
'query_id': query.query_id,
'eid': 0,
'event_name': 'FriendMessage',
'event': event_dump,
'is_prevent_default': False,
'is_prevent_postorder': False,
}
connector_module.context.EventContext.from_event = Mock(return_value=fake_event_ctx)
parsed_event_ctx = Mock()
connector_module.context.EventContext.model_validate = Mock(return_value=parsed_event_ctx)
try:
event_ctx = await connector.emit_event(event)
finally:
connector_module.context.EventContext.from_event = original_from_event
connector_module.context.EventContext.model_validate = original_model_validate
assert '_response_sources' not in vars(event_ctx)
assert event_ctx._emitted_plugins == [
{'manifest': {'metadata': {'author': 'tester', 'name': 'demo'}}},
]
@pytest.mark.asyncio
async def test_notify_plugin_diagnostic_skips_when_disabled(self):
connector_module = get_connector_module()
async def mock_disconnect(conn):
pass
mock_app = create_mock_app()
mock_app.instance_config.data = {'plugin': {'enable': False}}
connector = connector_module.PluginRuntimeConnector(mock_app, mock_disconnect)
connector.handler = AsyncMock()
await connector.notify_plugin_diagnostic({'code': 'response_delivery_failed'})
connector.handler.notify_plugin_diagnostic.assert_not_called()
@pytest.mark.asyncio
async def test_notify_plugin_diagnostic_is_best_effort(self):
connector = create_mock_connector()
connector.handler = AsyncMock()
connector.handler.notify_plugin_diagnostic = AsyncMock(side_effect=RuntimeError('action not found'))
await connector.notify_plugin_diagnostic({'code': 'response_delivery_failed'})
connector.handler.notify_plugin_diagnostic.assert_awaited_once()
connector.ap.logger.debug.assert_called_once()
class TestListKnowledgeEngines:
"""Tests for list_knowledge_engines method."""
+30
View File
@@ -159,6 +159,36 @@ class TestHandlerRagErrorResponse:
assert 'KeyError' in response.message
class TestHandlerPluginDiagnostic:
@pytest.mark.asyncio
async def test_notify_plugin_diagnostic_falls_back_to_raw_protocol_action(self):
"""Diagnostic forwarding works before the SDK enum exists."""
app = SimpleNamespace()
app.logger = SimpleNamespace(debug=MagicMock())
runtime_handler = make_handler(app)
runtime_handler.call_action = AsyncMock(return_value={})
payload = {'code': 'response_delivery_failed'}
await runtime_handler.notify_plugin_diagnostic(payload)
action = runtime_handler.call_action.await_args.args[0]
assert action.value == 'plugin_diagnostic'
assert runtime_handler.call_action.await_args.args[1] is payload
assert runtime_handler.call_action.await_args.kwargs['timeout'] == 5
def test_langbot_to_runtime_action_uses_enum_when_available(self):
"""The compatibility helper should prefer SDK enums once available."""
from langbot.pkg.plugin import handler as plugin_handler
sentinel = object()
original = plugin_handler.LangBotToRuntimeAction
plugin_handler.LangBotToRuntimeAction = SimpleNamespace(PLUGIN_DIAGNOSTIC=sentinel)
try:
assert plugin_handler._langbot_to_runtime_action('PLUGIN_DIAGNOSTIC', 'plugin_diagnostic') is sentinel
finally:
plugin_handler.LangBotToRuntimeAction = original
class TestConstantsSemanticVersion:
"""Tests for version constant access."""