From 3146c589055d0857bc8f69a0047f7c7b775beb81 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Wed, 17 Jun 2026 14:34:57 +0800 Subject: [PATCH] fix(monitoring): mark handled pipeline errors in traces --- .../pkg/api/http/service/monitoring.py | 2 + src/langbot/pkg/pipeline/pipelinemgr.py | 11 +++- tests/unit_tests/pipeline/test_pipelinemgr.py | 58 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index 8b2c4762..474502cf 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -9,6 +9,8 @@ from ....core import app from ....entity.persistence import monitoring as persistence_monitoring +# TODO: Move shared trace/time helpers into a small monitoring utility module +# when trace propagation expands beyond the current query/retrieval path. def _utc_now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 62df2669..caf28aca 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -270,6 +270,9 @@ class RuntimePipeline: f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' ) await self._check_output(query, result) + if result.error_notice: + span_status = 'error' + span_error = result.error_notice if result.result_type == pipeline_entities.ResultType.INTERRUPT: self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') @@ -290,6 +293,9 @@ class RuntimePipeline: f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}' ) await self._check_output(query, sub_result) + if sub_result.error_notice: + span_status = 'error' + span_error = sub_result.error_notice if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: self.ap.logger.debug( @@ -426,7 +432,10 @@ class RuntimePipeline: await self._execute_from_stage(0, query) # Record query success only if no error occurred during processing - if not query.variables.get('_monitoring_has_error', False): + has_monitoring_error = query.variables.get('_monitoring_has_error', False) + if has_monitoring_error: + trace_status = 'error' + else: try: await monitoring_helper.MonitoringHelper.record_query_success( ap=self.ap, diff --git a/tests/unit_tests/pipeline/test_pipelinemgr.py b/tests/unit_tests/pipeline/test_pipelinemgr.py index f2e6780d..0cbf8ab9 100644 --- a/tests/unit_tests/pipeline/test_pipelinemgr.py +++ b/tests/unit_tests/pipeline/test_pipelinemgr.py @@ -162,3 +162,61 @@ async def test_runtime_pipeline_execute(mock_app, sample_query): # Verify stage was called mock_stage.process.assert_called_once() + + +@pytest.mark.asyncio +async def test_runtime_pipeline_marks_trace_error_when_stage_returns_error_notice(mock_app, sample_query): + """Trace status follows handled stage errors, not only raised exceptions.""" + pipelinemgr = get_pipelinemgr_module() + stage = get_stage_module() + persistence_pipeline = get_persistence_pipeline_module() + entities = get_entities_module() + + error_result = entities.StageProcessResult( + result_type=entities.ResultType.INTERRUPT, + new_query=sample_query, + user_notice='', + console_notice='', + debug_notice='traceback', + error_notice='model request failed', + ) + + mock_stage = Mock(spec=stage.PipelineStage) + mock_stage.process = AsyncMock(return_value=error_result) + stage_container = pipelinemgr.StageInstContainer(inst_name='FailingStage', inst=mock_stage) + + pipeline_entity = Mock(spec=persistence_pipeline.LegacyPipeline) + pipeline_entity.uuid = 'test-pipeline-uuid' + pipeline_entity.name = 'Test Pipeline' + pipeline_entity.config = sample_query.pipeline_config + pipeline_entity.extensions_preferences = {'plugins': []} + + mock_app.bot_service = AsyncMock() + mock_app.bot_service.get_bot = AsyncMock(return_value={'name': 'Test Bot'}) + mock_app.monitoring_service = AsyncMock() + mock_app.monitoring_service.record_message = AsyncMock(return_value='message-1') + mock_app.monitoring_service.update_session_activity = AsyncMock(return_value=True) + mock_app.monitoring_service.start_trace = AsyncMock(return_value='trace-1') + mock_app.monitoring_service.record_span = AsyncMock() + mock_app.monitoring_service.finish_trace = AsyncMock() + mock_app.monitoring_service.update_message_status = AsyncMock() + mock_app.monitoring_service.record_error = AsyncMock() + + event_ctx = Mock() + event_ctx.is_prevented_default = Mock(return_value=False) + mock_app.plugin_connector.emit_event = AsyncMock(return_value=event_ctx) + mock_app.query_pool.cached_queries[sample_query.query_id] = sample_query + + runtime_pipeline = pipelinemgr.RuntimePipeline(mock_app, pipeline_entity, [stage_container]) + + await runtime_pipeline.run(sample_query) + + mock_app.monitoring_service.finish_trace.assert_awaited_once() + assert mock_app.monitoring_service.finish_trace.await_args.kwargs['status'] == 'error' + + span_calls = mock_app.monitoring_service.record_span.await_args_list + stage_span_call = next(call for call in span_calls if call.kwargs['name'] == 'FailingStage') + root_span_call = next(call for call in span_calls if call.kwargs['kind'] == 'pipeline.query') + assert stage_span_call.kwargs['status'] == 'error' + assert stage_span_call.kwargs['error_message'] == 'model request failed' + assert root_span_call.kwargs['status'] == 'error'