fix(monitoring): mark handled pipeline errors in traces

This commit is contained in:
huanghuoguoguo
2026-06-17 14:34:57 +08:00
parent d92b664136
commit 3146c58905
3 changed files with 70 additions and 1 deletions
@@ -9,6 +9,8 @@ from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring
# TODO: Move shared trace/time helpers into a small monitoring utility module
# when trace propagation expands beyond the current query/retrieval path.
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
+10 -1
View File
@@ -270,6 +270,9 @@ class RuntimePipeline:
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
)
await self._check_output(query, result)
if result.error_notice:
span_status = 'error'
span_error = result.error_notice
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
@@ -290,6 +293,9 @@ class RuntimePipeline:
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
)
await self._check_output(query, sub_result)
if sub_result.error_notice:
span_status = 'error'
span_error = sub_result.error_notice
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(
@@ -426,7 +432,10 @@ class RuntimePipeline:
await self._execute_from_stage(0, query)
# Record query success only if no error occurred during processing
if not query.variables.get('_monitoring_has_error', False):
has_monitoring_error = query.variables.get('_monitoring_has_error', False)
if has_monitoring_error:
trace_status = 'error'
else:
try:
await monitoring_helper.MonitoringHelper.record_query_success(
ap=self.ap,