Compare commits

...

4 Commits

Author SHA1 Message Date
huanghuoguoguo 16d47d3e61 fix(monitoring): measure host rag duration 2026-06-17 20:35:14 +08:00
huanghuoguoguo 3146c58905 fix(monitoring): mark handled pipeline errors in traces 2026-06-17 14:34:57 +08:00
huanghuoguoguo d92b664136 test(monitoring): cover trace observability 2026-06-17 10:46:41 +08:00
huanghuoguoguo 8789c42eeb feat(monitoring): add host RAG trace observability 2026-06-17 00:13:57 +08:00
25 changed files with 1894 additions and 37 deletions
@@ -313,18 +313,30 @@ class MonitoringRouterGroup(group.RouterGroup):
offset=0, offset=0,
) )
# Get traces
traces, traces_total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
return self.success( return self.success(
data={ data={
'overview': overview, 'overview': overview,
'messages': messages, 'messages': messages,
'llmCalls': llm_calls, 'llmCalls': llm_calls,
'embeddingCalls': embedding_calls, 'embeddingCalls': embedding_calls,
'traces': traces,
'sessions': sessions, 'sessions': sessions,
'errors': errors, 'errors': errors,
'totalCount': { 'totalCount': {
'messages': messages_total, 'messages': messages_total,
'llmCalls': llm_calls_total, 'llmCalls': llm_calls_total,
'embeddingCalls': embedding_calls_total, 'embeddingCalls': embedding_calls_total,
'traces': traces_total,
'sessions': sessions_total, 'sessions': sessions_total,
'errors': errors_total, 'errors': errors_total,
}, },
@@ -350,6 +362,49 @@ class MonitoringRouterGroup(group.RouterGroup):
return self.success(data=details) return self.success(data=details)
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_traces() -> str:
"""Get end-to-end trace records."""
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
session_ids = quart.request.args.getlist('sessionId')
statuses = quart.request.args.getlist('status')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
traces, total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
session_ids=session_ids if session_ids else None,
statuses=statuses if statuses else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'traces': traces,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_trace_details(trace_id: str) -> str:
"""Get one trace with all spans."""
details = await self.ap.monitoring_service.get_trace_details(trace_id)
if not details.get('found'):
return self.http_status(404, -1, f'Trace {trace_id} not found')
return self.success(data=details)
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) @self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def export_data() -> tuple[str, int]: async def export_data() -> tuple[str, int]:
"""Export monitoring data as CSV""" """Export monitoring data as CSV"""
@@ -350,8 +350,24 @@ class PluginsRouterGroup(group.RouterGroup):
if not endpoint.startswith('/') or '..' in endpoint: if not endpoint.startswith('/') or '..' in endpoint:
return self.http_status(400, -1, 'invalid endpoint') return self.http_status(400, -1, 'invalid endpoint')
caller = {
'plugin_author': author,
'plugin_name': plugin_name,
'page_id': page_id,
'origin': _get_request_origin(),
}
headers = {
key: value
for key, value in {
'user-agent': quart.request.headers.get('User-Agent'),
'x-request-id': quart.request.headers.get('X-Request-ID'),
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
}.items()
if value
}
result = await self.ap.plugin_connector.handle_page_api( result = await self.ap.plugin_connector.handle_page_api(
author, plugin_name, page_id, endpoint, method.upper(), body author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers
) )
if result.get('error'): if result.get('error'):
return self.http_status(400, -1, result['error']) return self.http_status(400, -1, result['error'])
@@ -3,11 +3,55 @@ from __future__ import annotations
import uuid import uuid
import datetime import datetime
import sqlalchemy import sqlalchemy
import json
from ....core import app from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring from ....entity.persistence import monitoring as persistence_monitoring
# TODO: Move shared trace/time helpers into a small monitoring utility module
# when trace propagation expands beyond the current query/retrieval path.
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _json_dumps(value: dict | list | None) -> str | None:
if value is None:
return None
try:
return json.dumps(value, ensure_ascii=False, default=str)
except Exception:
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
def _json_loads(value: str | None) -> dict | list | None:
if not value:
return None
try:
return json.loads(value)
except Exception:
return None
def new_trace_id() -> str:
return f'trace-{uuid.uuid4().hex[:16]}'
def new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
def normalize_trace_status(status: str | None) -> str:
"""Normalize operation status to the monitoring UI vocabulary."""
if status in ('completed', 'ok'):
return 'success'
if status in ('failed', 'failure', 'exception'):
return 'error'
if status in ('running', 'success', 'error'):
return status
return 'success'
class MonitoringService: class MonitoringService:
"""Monitoring service""" """Monitoring service"""
@@ -74,6 +118,18 @@ class MonitoringService:
persistence_monitoring.MonitoringFeedback.timestamp, persistence_monitoring.MonitoringFeedback.timestamp,
persistence_monitoring.MonitoringFeedback.id, persistence_monitoring.MonitoringFeedback.id,
), ),
(
'monitoring_traces',
persistence_monitoring.MonitoringTrace,
persistence_monitoring.MonitoringTrace.started_at,
persistence_monitoring.MonitoringTrace.trace_id,
),
(
'monitoring_spans',
persistence_monitoring.MonitoringSpan,
persistence_monitoring.MonitoringSpan.started_at,
persistence_monitoring.MonitoringSpan.span_id,
),
] ]
deleted_counts: dict[str, int] = {} deleted_counts: dict[str, int] = {}
@@ -133,6 +189,116 @@ class MonitoringService:
# ========== Recording Methods ========== # ========== Recording Methods ==========
async def start_trace(
self,
trace_id: str | None = None,
name: str = 'LangBot query',
bot_id: str | None = None,
bot_name: str | None = None,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
query_id: str | int | None = None,
attributes: dict | None = None,
) -> str:
"""Create or update a trace header row."""
trace_id = trace_id or new_trace_id()
trace_data = {
'trace_id': trace_id,
'started_at': _utc_now(),
'ended_at': None,
'duration': None,
'status': 'running',
'name': name,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'message_id': message_id,
'query_id': str(query_id) if query_id is not None else None,
'attributes': _json_dumps(attributes),
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
)
return trace_id
async def finish_trace(
self,
trace_id: str,
status: str = 'success',
duration: int | None = None,
message_id: str | None = None,
attributes: dict | None = None,
) -> None:
"""Mark a trace complete."""
update_values: dict = {
'ended_at': _utc_now(),
'status': normalize_trace_status(status),
}
if duration is not None:
update_values['duration'] = duration
if message_id is not None:
update_values['message_id'] = message_id
if attributes is not None:
update_values['attributes'] = _json_dumps(attributes)
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
.values(update_values)
)
async def record_span(
self,
trace_id: str,
name: str,
kind: str,
status: str = 'success',
span_id: str | None = None,
parent_span_id: str | None = None,
started_at: datetime.datetime | None = None,
ended_at: datetime.datetime | None = None,
duration: int | None = None,
message_id: str | None = None,
session_id: str | None = None,
bot_id: str | None = None,
pipeline_id: str | None = None,
attributes: dict | None = None,
error_message: str | None = None,
) -> str:
"""Record a single completed span."""
started_at = started_at or _utc_now()
if duration is None and ended_at is not None:
duration = int((ended_at - started_at).total_seconds() * 1000)
elif duration is not None:
duration = int(round(float(duration)))
span_data = {
'span_id': span_id or new_span_id(),
'trace_id': trace_id,
'parent_span_id': parent_span_id,
'name': name,
'kind': kind,
'status': normalize_trace_status(status),
'started_at': started_at,
'ended_at': ended_at or _utc_now(),
'duration': duration,
'message_id': message_id,
'session_id': session_id,
'bot_id': bot_id,
'pipeline_id': pipeline_id,
'attributes': _json_dumps(attributes),
'error_message': error_message,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
)
return span_data['span_id']
async def record_message( async def record_message(
self, self,
bot_id: str, bot_id: str,
@@ -1076,6 +1242,19 @@ class MonitoringService:
for row in error_rows for row in error_rows
] ]
trace_query = (
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
.limit(1)
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
trace = None
if trace_row:
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
trace = self._serialize_trace(trace_model)
return { return {
'message_id': message_id, 'message_id': message_id,
'found': True, 'found': True,
@@ -1090,6 +1269,84 @@ class MonitoringService:
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0, 'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
}, },
'errors': errors, 'errors': errors,
'trace': trace,
}
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
async def get_traces(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
session_ids: list[str] | None = None,
statuses: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get trace headers with filters."""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
if session_ids:
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
if statuses:
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
if start_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
if conditions:
clause = sqlalchemy.and_(*conditions)
count_query = count_query.where(clause)
query = query.where(clause)
total_result = await self.ap.persistence_mgr.execute_async(count_query)
total = total_result.scalar() or 0
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()]
return traces, total
async def get_trace_details(self, trace_id: str) -> dict:
"""Get a single trace and all spans in chronological order."""
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
persistence_monitoring.MonitoringTrace.trace_id == trace_id
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
if not trace_row:
return {'trace_id': trace_id, 'found': False}
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
span_query = (
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
)
span_result = await self.ap.persistence_mgr.execute_async(span_query)
spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()]
return {
'trace_id': trace_id,
'found': True,
'trace': self._serialize_trace(trace),
'spans': spans,
} }
# ========== Export Methods ========== # ========== Export Methods ==========
@@ -3,6 +3,49 @@ import sqlalchemy
from .base import Base from .base import Base
class MonitoringTrace(Base):
"""End-to-end monitoring trace records"""
__tablename__ = 'monitoring_traces'
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringSpan(Base):
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
__tablename__ = 'monitoring_spans'
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringMessage(Base): class MonitoringMessage(Base):
"""Monitoring message records""" """Monitoring message records"""
@@ -0,0 +1,88 @@
"""add monitoring traces and spans
Revision ID: 0006_monitoring_traces
Revises: 0005_add_llm_context_length
Create Date: 2026-06-16
"""
import sqlalchemy as sa
from alembic import op
revision = '0006_monitoring_traces'
down_revision = '0005_add_llm_context_length'
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_traces' not in tables:
op.create_table(
'monitoring_traces',
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('bot_name', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('query_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('trace_id'),
)
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
if 'monitoring_spans' not in tables:
op.create_table(
'monitoring_spans',
sa.Column('span_id', sa.String(length=255), nullable=False),
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('kind', sa.String(length=80), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('span_id'),
)
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_spans' in tables:
op.drop_table('monitoring_spans')
if 'monitoring_traces' in tables:
op.drop_table('monitoring_traces')
+156 -27
View File
@@ -2,6 +2,9 @@ from __future__ import annotations
import typing import typing
import traceback import traceback
import time
import uuid
import datetime
import sqlalchemy import sqlalchemy
@@ -79,6 +82,19 @@ class RuntimePipeline:
enable_all_plugins: bool enable_all_plugins: bool
"""是否启用所有插件""" """是否启用所有插件"""
@staticmethod
def _new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
@staticmethod
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
@staticmethod
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
enable_all_mcp_servers: bool enable_all_mcp_servers: bool
"""是否启用所有MCP服务器""" """是否启用所有MCP服务器"""
@@ -234,44 +250,102 @@ class RuntimePipeline:
stage_container = self.stage_containers[i] stage_container = self.stage_containers[i]
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里 query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
span_started_at = self._utc_now()
span_started = time.perf_counter()
span_status = 'success'
span_error = None
span_result_type = None
result = stage_container.inst.process(query, stage_container.inst_name) try:
result = stage_container.inst.process(query, stage_container.inst_name)
if isinstance(result, typing.Coroutine): if isinstance(result, typing.Coroutine):
result = await result result = await result
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果 if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
self.ap.logger.debug( span_result_type = str(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' result.result_type.value if hasattr(result.result_type, 'value') else result.result_type
)
await self._check_output(query, result)
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
) )
await self._check_output(query, sub_result) self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
)
await self._check_output(query, result)
if result.error_notice:
span_status = 'error'
span_error = result.error_notice
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE: elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query query = result.new_query
await self._execute_from_stage(i + 1, query) elif isinstance(result, typing.AsyncGenerator): # 生成器
break span_result_type = 'generator'
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
span_result_type = str(
sub_result.result_type.value
if hasattr(sub_result.result_type, 'value')
else sub_result.result_type
)
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
)
await self._check_output(query, sub_result)
if sub_result.error_notice:
span_status = 'error'
span_error = sub_result.error_notice
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(
f'Stage {stage_container.inst_name} interrupted query {query.query_id}'
)
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
await self._execute_from_stage(i + 1, query)
break
except Exception as e:
span_status = 'error'
span_error = str(e)
raise
finally:
trace_id = (query.variables or {}).get('_monitoring_trace_id')
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
if trace_id:
try:
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=root_span_id,
name=stage_container.inst_name,
kind='pipeline.stage',
status=span_status,
started_at=span_started_at,
duration=int((time.perf_counter() - span_started) * 1000),
message_id=(query.variables or {}).get('_monitoring_message_id'),
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'stage_class': stage_container.inst.__class__.__name__,
'result_type': span_result_type,
'query_id': query.query_id,
},
error_message=span_error,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
i += 1 i += 1
async def process_query(self, query: pipeline_query.Query): async def process_query(self, query: pipeline_query.Query):
"""处理请求""" """处理请求"""
trace_started_at = self._utc_now()
trace_started = time.perf_counter()
root_span_id = self._new_span_id()
trace_id = None
trace_status = 'success'
# Get monitoring metadata # Get monitoring metadata
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown') bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown') pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
@@ -303,6 +377,28 @@ class RuntimePipeline:
except Exception as e: except Exception as e:
self.ap.logger.error(f'Failed to record query start: {e}') self.ap.logger.error(f'Failed to record query start: {e}')
try:
trace_id = await self.ap.monitoring_service.start_trace(
name='LangBot query',
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
session_id=self._query_session_id(query),
message_id=message_id or None,
query_id=query.query_id,
attributes={
'launcher_type': query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
'runner_name': runner_name,
},
)
query.variables['_monitoring_trace_id'] = trace_id
query.variables['_monitoring_root_span_id'] = root_span_id
except Exception as e:
self.ap.logger.error(f'Failed to start query trace: {e}')
try: try:
# Get bound plugins for this pipeline # Get bound plugins for this pipeline
bound_plugins = query.variables.get('_pipeline_bound_plugins', None) bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
@@ -336,7 +432,10 @@ class RuntimePipeline:
await self._execute_from_stage(0, query) await self._execute_from_stage(0, query)
# Record query success only if no error occurred during processing # Record query success only if no error occurred during processing
if not query.variables.get('_monitoring_has_error', False): has_monitoring_error = query.variables.get('_monitoring_has_error', False)
if has_monitoring_error:
trace_status = 'error'
else:
try: try:
await monitoring_helper.MonitoringHelper.record_query_success( await monitoring_helper.MonitoringHelper.record_query_success(
ap=self.ap, ap=self.ap,
@@ -361,6 +460,7 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query response: {e}') self.ap.logger.error(f'Failed to record query response: {e}')
except Exception as e: except Exception as e:
trace_status = 'error'
inst_name = query.current_stage_name if query.current_stage_name else 'unknown' inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}') self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Traceback: {traceback.format_exc()}') self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
@@ -383,6 +483,35 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query error: {me}') self.ap.logger.error(f'Failed to record query error: {me}')
finally: finally:
if trace_id:
try:
duration_ms = int((time.perf_counter() - trace_started) * 1000)
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=root_span_id,
name='LangBot query',
kind='pipeline.query',
status=trace_status,
started_at=trace_started_at,
duration=duration_ms,
message_id=message_id or None,
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'query_id': query.query_id,
'pipeline_name': pipeline_name,
'runner_name': runner_name,
},
)
await self.ap.monitoring_service.finish_trace(
trace_id=trace_id,
status=trace_status,
duration=duration_ms,
message_id=message_id or None,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
self.ap.logger.debug(f'Query {query.query_id} processed') self.ap.logger.debug(f'Query {query.query_id} processed')
del self.ap.query_pool.cached_queries[query.query_id] del self.ap.query_pool.cached_queries[query.query_id]
+12 -1
View File
@@ -711,8 +711,19 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
endpoint: str, endpoint: str,
method: str, method: str,
body: Any = None, body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body) return await self.handler.handle_page_api(
plugin_author,
plugin_name,
page_id,
endpoint,
method,
body,
caller,
headers or {},
)
async def get_debug_info(self) -> dict[str, Any]: async def get_debug_info(self) -> dict[str, Any]:
"""Get debug information including debug key and WS URL""" """Get debug information including debug key and WS URL"""
+19
View File
@@ -755,6 +755,21 @@ class RuntimeConnectionHandler(handler.Handler):
'session_name': session_name, 'session_name': session_name,
'bot_uuid': query.bot_uuid or '', 'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id), 'sender_id': str(query.sender_id),
'_trace_context': {
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
'parent_span_id': query.variables.get('_monitoring_root_span_id')
if query.variables
else None,
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
'query_id': query.query_id,
'session_id': session_name,
'bot_id': query.bot_uuid or '',
'pipeline_id': query.pipeline_uuid or '',
'knowledge_base_id': kb_id,
'attributes': {
'source': 'plugin-api',
},
},
}, },
) )
results = [entry.model_dump(mode='json') for entry in entries] results = [entry.model_dump(mode='json') for entry in entries]
@@ -1011,6 +1026,8 @@ class RuntimeConnectionHandler(handler.Handler):
endpoint: str, endpoint: str,
method: str, method: str,
body: Any = None, body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Forward a page API call to the plugin via runtime.""" """Forward a page API call to the plugin via runtime."""
result = await self.call_action( result = await self.call_action(
@@ -1022,6 +1039,8 @@ class RuntimeConnectionHandler(handler.Handler):
'endpoint': endpoint, 'endpoint': endpoint,
'method': method, 'method': method,
'body': body, 'body': body,
'caller': caller,
'headers': headers or {},
}, },
timeout=30, timeout=30,
) )
@@ -3,6 +3,7 @@ from __future__ import annotations
import abc import abc
import typing import typing
import time import time
import datetime
from ...core import app from ...core import app
from ...entity.persistence import model as persistence_model from ...entity.persistence import model as persistence_model
@@ -16,6 +17,15 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage' STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None: def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
"""Store the latest provider usage on the query for upstream action handlers.""" """Store the latest provider usage on the query for upstream action handlers."""
if query is None or not usage_info: if query is None or not usage_info:
@@ -59,6 +69,7 @@ class RuntimeProvider:
"""Bridge method for invoking LLM with monitoring""" """Bridge method for invoking LLM with monitoring"""
# Start timing for monitoring # Start timing for monitoring
start_time = time.time() start_time = time.time()
span_started_at = _utc_now()
input_tokens = 0 input_tokens = 0
output_tokens = 0 output_tokens = 0
status = 'success' status = 'success'
@@ -125,6 +136,30 @@ class RuntimeProvider:
error_message=error_message, error_message=error_message,
message_id=message_id, message_id=message_id,
) )
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': False,
},
error_message=error_message,
)
except Exception as monitor_err: except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}') self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
@@ -140,6 +175,7 @@ class RuntimeProvider:
"""Bridge method for invoking LLM stream with monitoring""" """Bridge method for invoking LLM stream with monitoring"""
# Start timing for monitoring # Start timing for monitoring
start_time = time.time() start_time = time.time()
span_started_at = _utc_now()
status = 'success' status = 'success'
error_message = None error_message = None
input_tokens = 0 input_tokens = 0
@@ -204,6 +240,30 @@ class RuntimeProvider:
error_message=error_message, error_message=error_message,
message_id=message_id, message_id=message_id,
) )
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM stream {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': True,
},
error_message=error_message,
)
except Exception as monitor_err: except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}') self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
@@ -268,6 +268,21 @@ class LocalAgentRunner(runner.RequestRunner):
'bot_uuid': query.bot_uuid or '', 'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id), 'sender_id': str(query.sender_id),
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}', 'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'_trace_context': {
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
'parent_span_id': query.variables.get('_monitoring_root_span_id')
if query.variables
else None,
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
'query_id': query.query_id,
'session_id': f'{query.launcher_type.value}_{query.launcher_id}',
'bot_id': query.bot_uuid or '',
'pipeline_id': query.pipeline_uuid or '',
'knowledge_base_id': kb_uuid,
'attributes': {
'source': 'local-agent',
},
},
}, },
) )
+123 -4
View File
@@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
import mimetypes import mimetypes
import os.path import os.path
import time
import traceback import traceback
import uuid import uuid
import zipfile import zipfile
import io import io
import datetime
from typing import Any from typing import Any
from langbot.pkg.core import app from langbot.pkg.core import app
import sqlalchemy import sqlalchemy
@@ -25,6 +27,10 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
super().__init__(ap) super().__init__(ap)
self.knowledge_base_entity = knowledge_base_entity self.knowledge_base_entity = knowledge_base_entity
@staticmethod
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
async def initialize(self): async def initialize(self):
pass pass
@@ -334,6 +340,25 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
# are passed directly to vector_search by some plugins (e.g. LangRAG) # are passed directly to vector_search by some plugins (e.g. LangRAG)
# and would cause empty results when the metadata field doesn't exist. # and would cause empty results when the metadata field doesn't exist.
filters = settings.pop('filters', {}) filters = settings.pop('filters', {})
trace_context = settings.pop('_trace_context', None)
host_span_started_at = self._utc_now()
host_span_started = time.perf_counter()
host_span_id = None
if trace_context and trace_context.get('trace_id'):
host_parent_span_id = trace_context.get('parent_span_id')
host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}'
trace_context = {
'trace_id': trace_context.get('trace_id'),
'parent_span_id': host_span_id,
'host_parent_span_id': host_parent_span_id,
'message_id': trace_context.get('message_id'),
'query_id': trace_context.get('query_id'),
'session_id': trace_context.get('session_id'),
'bot_id': trace_context.get('bot_id'),
'pipeline_id': trace_context.get('pipeline_id'),
'knowledge_base_id': kb.uuid,
'attributes': trace_context.get('attributes') or {},
}
retrieval_context = { retrieval_context = {
'query': query, 'query': query,
@@ -343,13 +368,107 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
'creation_settings': kb.creation_settings or {}, 'creation_settings': kb.creation_settings or {},
'filters': filters, 'filters': filters,
} }
if trace_context:
retrieval_context['trace_context'] = trace_context
result = await self.ap.plugin_connector.call_rag_retrieve( try:
plugin_id, result = await self.ap.plugin_connector.call_rag_retrieve(
retrieval_context, plugin_id,
) retrieval_context,
)
except Exception as e:
if trace_context:
await self._record_rag_trace_result(
trace_context=trace_context,
host_span_id=host_span_id,
started_at=host_span_started_at,
duration=int((time.perf_counter() - host_span_started) * 1000),
plugin_id=plugin_id,
result={
'results': [],
'metadata': {
'status': 'error',
'error_message': str(e),
},
},
)
raise
if trace_context:
await self._record_rag_trace_result(
trace_context=trace_context,
host_span_id=host_span_id,
started_at=host_span_started_at,
duration=int((time.perf_counter() - host_span_started) * 1000),
plugin_id=plugin_id,
result=result,
)
return result return result
async def _record_rag_trace_result(
self,
trace_context: dict[str, Any],
host_span_id: str | None,
started_at: datetime.datetime,
duration: int,
plugin_id: str,
result: dict[str, Any],
) -> None:
"""Persist host RAG span and plugin-provided child spans."""
trace_id = trace_context.get('trace_id')
if not trace_id:
return
metadata = result.get('metadata') if isinstance(result, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else []
parent_span_id = trace_context.get('parent_span_id')
host_parent_span_id = trace_context.get('host_parent_span_id')
try:
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=host_span_id,
parent_span_id=host_parent_span_id,
name=f'Knowledge retrieval {self.knowledge_base_entity.name}',
kind='rag.retrieval',
status=metadata.get('status', 'success'),
started_at=started_at,
duration=duration,
message_id=trace_context.get('message_id'),
session_id=trace_context.get('session_id'),
bot_id=trace_context.get('bot_id'),
pipeline_id=trace_context.get('pipeline_id'),
attributes={
'knowledge_base_id': self.knowledge_base_entity.uuid,
'knowledge_base_name': self.knowledge_base_entity.name,
'plugin_id': plugin_id,
'returned_count': len(result.get('results', []) if isinstance(result, dict) else []),
'total_found': result.get('total_found') if isinstance(result, dict) else None,
},
error_message=metadata.get('error_message'),
)
for span in plugin_spans:
if not isinstance(span, dict):
continue
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=span.get('span_id'),
parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id,
name=span.get('name') or 'RAG plugin stage',
kind=span.get('kind') or 'rag.stage',
status=span.get('status') or 'success',
started_at=started_at,
duration=span.get('duration_ms'),
message_id=trace_context.get('message_id'),
session_id=trace_context.get('session_id'),
bot_id=trace_context.get('bot_id'),
pipeline_id=trace_context.get('pipeline_id'),
attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {},
error_message=span.get('error_message'),
)
except Exception as e:
self.ap.logger.error(f'Failed to record RAG trace spans: {e}')
async def _delete_document(self, document_id: str) -> bool: async def _delete_document(self, document_id: str) -> bool:
"""Call plugin to delete document.""" """Call plugin to delete document."""
kb = self.knowledge_base_entity kb = self.knowledge_base_entity
+65
View File
@@ -8,6 +8,7 @@ Run: uv run pytest tests/integration/api/test_monitoring.py -q
from __future__ import annotations from __future__ import annotations
import datetime
import pytest import pytest
from unittest.mock import MagicMock, AsyncMock, Mock from unittest.mock import MagicMock, AsyncMock, Mock
@@ -82,6 +83,15 @@ def fake_monitoring_app():
app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100)) app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100))
app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50)) app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50))
app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10)) app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10))
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
app.monitoring_service.get_trace_details = AsyncMock(
side_effect=lambda trace_id: {
'found': trace_id == 'trace-1',
'trace_id': trace_id,
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
'spans': [] if trace_id == 'trace-1' else None,
}
)
app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20)) app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20))
app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2)) app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2))
app.monitoring_service.get_session_analysis = AsyncMock( app.monitoring_service.get_session_analysis = AsyncMock(
@@ -222,6 +232,7 @@ class TestMonitoringAllDataEndpoint:
assert response.status_code == 200 assert response.status_code == 200
data = await response.get_json() data = await response.get_json()
assert 'overview' in data['data'] assert 'overview' in data['data']
assert 'traces' in data['data']
@pytest.mark.usefixtures('mock_circular_import_chain') @pytest.mark.usefixtures('mock_circular_import_chain')
@@ -246,6 +257,60 @@ class TestMonitoringDetailsEndpoints:
assert response.status_code == 200 assert response.status_code == 200
@pytest.mark.asyncio
async def test_get_trace_details(self, quart_test_client):
"""GET /api/v1/monitoring/traces/{id}."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces/trace-1', headers={'Authorization': 'Bearer test_token'}
)
assert response.status_code == 200
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestMonitoringTraceEndpoints:
"""Tests for trace list and detail endpoints."""
@pytest.mark.asyncio
async def test_get_traces_forwards_filters(self, quart_test_client, fake_monitoring_app):
"""GET /api/v1/monitoring/traces forwards filters to service."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces'
'?botId=bot-1'
'&pipelineId=pipeline-1'
'&sessionId=session-1'
'&status=success'
'&startTime=2026-01-01T00:00:00Z'
'&endTime=2026-01-02T00:00:00Z'
'&limit=25'
'&offset=5',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 200
data = await response.get_json()
assert data['data']['traces'] == [{'trace_id': 'trace-1'}]
assert data['data']['total'] == 1
fake_monitoring_app.monitoring_service.get_traces.assert_awaited_with(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
start_time=datetime.datetime(2026, 1, 1, 0, 0),
end_time=datetime.datetime(2026, 1, 2, 0, 0),
limit=25,
offset=5,
)
@pytest.mark.asyncio
async def test_get_trace_details_not_found(self, quart_test_client):
"""GET /api/v1/monitoring/traces/{id} returns 404 when missing."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces/trace-missing', headers={'Authorization': 'Bearer test_token'}
)
assert response.status_code == 404
@pytest.mark.usefixtures('mock_circular_import_chain') @pytest.mark.usefixtures('mock_circular_import_chain')
class TestMonitoringFeedbackEndpoints: class TestMonitoringFeedbackEndpoints:
@@ -104,7 +104,7 @@ class TestSQLiteMigrationUpgrade:
rev = await get_alembic_current(sqlite_engine) rev = await get_alembic_current(sqlite_engine)
assert rev is not None, 'Expected a revision after upgrade' assert rev is not None, 'Expected a revision after upgrade'
# Head should be the latest migration # Head should be the latest migration
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}' assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_upgrade_idempotent(self, sqlite_engine): async def test_upgrade_idempotent(self, sqlite_engine):
@@ -144,8 +144,8 @@ class TestPostgreSQLMigrationUpgrade:
# Verify revision # Verify revision
rev = await get_alembic_current(postgres_engine) rev = await get_alembic_current(postgres_engine)
assert rev is not None, 'Expected a revision after upgrade' assert rev is not None, 'Expected a revision after upgrade'
# Head should be the latest migration (0005 for current state) # Head should be the latest migration.
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}' assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version): async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version):
@@ -0,0 +1,207 @@
"""Unit tests for MonitoringService trace observability."""
from __future__ import annotations
import datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine
from langbot.pkg.api.http.service.monitoring import MonitoringService
from langbot.pkg.entity.persistence.base import Base
from langbot.pkg.entity.persistence import monitoring as persistence_monitoring
pytestmark = pytest.mark.asyncio
class _SQLitePersistence:
def __init__(self, engine):
self._engine = engine
def get_db_engine(self):
return self._engine
async def execute_async(self, *args, **kwargs):
async with self._engine.connect() as conn:
result = await conn.execute(*args, **kwargs)
await conn.commit()
return result
def serialize_model(self, model, data, masked_columns=None):
masked_columns = masked_columns or []
return {
column.name: getattr(data, column.name).isoformat()
if isinstance(getattr(data, column.name), datetime.datetime)
else getattr(data, column.name)
for column in model.__table__.columns
if column.name not in masked_columns
}
@pytest.fixture
async def monitoring_service(tmp_path):
engine = create_async_engine(f'sqlite+aiosqlite:///{tmp_path / "monitoring.db"}')
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
ap = SimpleNamespace(
persistence_mgr=_SQLitePersistence(engine),
instance_config=SimpleNamespace(data={'database': {'use': 'sqlite'}}),
logger=Mock(),
)
service = MonitoringService(ap)
yield service
await engine.dispose()
async def test_trace_lifecycle_records_spans_and_returns_details(monitoring_service):
started_at = datetime.datetime(2026, 1, 1, 12, 0, 0)
ended_at = started_at + datetime.timedelta(milliseconds=125)
trace_id = await monitoring_service.start_trace(
trace_id='trace-test',
name='Pipeline query',
bot_id='bot-1',
bot_name='Bot',
pipeline_id='pipeline-1',
pipeline_name='Default',
session_id='session-1',
message_id='message-1',
query_id=42,
attributes={'source': 'unit-test'},
)
assert trace_id == 'trace-test'
root_span_id = await monitoring_service.record_span(
trace_id=trace_id,
span_id='span-root',
name='Pipeline',
kind='pipeline',
status='completed',
started_at=started_at,
ended_at=ended_at,
message_id='message-1',
session_id='session-1',
bot_id='bot-1',
pipeline_id='pipeline-1',
attributes={'stage_count': 2},
)
await monitoring_service.record_span(
trace_id=trace_id,
span_id='span-rag',
parent_span_id=root_span_id,
name='RAG retrieval',
kind='rag.retrieval',
status='failed',
started_at=started_at + datetime.timedelta(seconds=1),
duration=12.7,
attributes={'top_k': 5},
error_message='vector store timeout',
)
await monitoring_service.finish_trace(
trace_id,
status='completed',
duration=250,
message_id='message-final',
attributes={'result_type': 'reply'},
)
traces, total = await monitoring_service.get_traces(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
limit=10,
offset=0,
)
assert total == 1
assert traces[0]['trace_id'] == trace_id
assert traces[0]['status'] == 'success'
assert traces[0]['message_id'] == 'message-final'
assert traces[0]['query_id'] == '42'
assert traces[0]['attributes'] == {'result_type': 'reply'}
details = await monitoring_service.get_trace_details(trace_id)
assert details['found'] is True
assert details['trace']['trace_id'] == trace_id
assert [span['span_id'] for span in details['spans']] == ['span-root', 'span-rag']
assert details['spans'][0]['status'] == 'success'
assert details['spans'][0]['duration'] == 125
assert details['spans'][0]['attributes'] == {'stage_count': 2}
assert details['spans'][1]['status'] == 'error'
assert details['spans'][1]['duration'] == 13
assert details['spans'][1]['parent_span_id'] == 'span-root'
assert details['spans'][1]['error_message'] == 'vector store timeout'
async def test_get_trace_details_returns_not_found_for_missing_trace(monitoring_service):
details = await monitoring_service.get_trace_details('trace-missing')
assert details == {'trace_id': 'trace-missing', 'found': False}
async def test_cleanup_expired_records_includes_traces_and_spans(monitoring_service):
old_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(days=30)
recent_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
await monitoring_service.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringTrace),
[
{
'trace_id': 'trace-old',
'started_at': old_time,
'ended_at': old_time,
'duration': 10,
'status': 'success',
'name': 'Old trace',
},
{
'trace_id': 'trace-recent',
'started_at': recent_time,
'ended_at': recent_time,
'duration': 10,
'status': 'success',
'name': 'Recent trace',
},
],
)
await monitoring_service.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSpan),
[
{
'span_id': 'span-old',
'trace_id': 'trace-old',
'name': 'Old span',
'kind': 'pipeline',
'status': 'success',
'started_at': old_time,
'ended_at': old_time,
},
{
'span_id': 'span-recent',
'trace_id': 'trace-recent',
'name': 'Recent span',
'kind': 'pipeline',
'status': 'success',
'started_at': recent_time,
'ended_at': recent_time,
},
],
)
monitoring_service._release_sqlite_space = AsyncMock()
deleted = await monitoring_service.cleanup_expired_records(retention_days=7, batch_size=1)
assert deleted['monitoring_traces'] == 1
assert deleted['monitoring_spans'] == 1
monitoring_service._release_sqlite_space.assert_awaited_once()
remaining = await monitoring_service.get_trace_details('trace-recent')
assert remaining['found'] is True
assert remaining['spans'][0]['span_id'] == 'span-recent'
@@ -0,0 +1,111 @@
"""Unit tests for monitoring trace HTTP routes."""
from __future__ import annotations
import datetime
from unittest.mock import AsyncMock, Mock
import pytest
import quart
from tests.factories import FakeApp
from tests.utils.import_isolation import MockLifecycleControlScope, isolated_sys_modules
pytestmark = pytest.mark.asyncio
@pytest.fixture
async def monitoring_client():
mock_app = Mock()
mock_app.Application = type('FakeMinimalApplication', (), {})
mock_entities = Mock()
mock_entities.LifecycleControlScope = MockLifecycleControlScope
clear = [
'langbot.pkg.api.http.controller.group',
'langbot.pkg.api.http.controller.groups',
'langbot.pkg.api.http.controller.groups.monitoring',
'langbot.pkg.api.http.controller.main',
]
app = FakeApp()
app.user_service = Mock()
app.user_service.verify_jwt_token = AsyncMock(return_value='test@example.com')
app.user_service.get_user_by_email = AsyncMock(return_value=Mock(email='test@example.com'))
app.monitoring_service = Mock()
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
app.monitoring_service.get_trace_details = AsyncMock(
side_effect=lambda trace_id: {
'found': trace_id == 'trace-1',
'trace_id': trace_id,
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
'spans': [] if trace_id == 'trace-1' else None,
}
)
with isolated_sys_modules(
mocks={
'langbot.pkg.core.app': mock_app,
'langbot.pkg.core.entities': mock_entities,
},
clear=clear,
):
from langbot.pkg.api.http.controller.groups.monitoring import MonitoringRouterGroup
quart_app = quart.Quart(__name__)
group = MonitoringRouterGroup(app, quart_app)
await group.initialize()
yield app, quart_app.test_client()
async def test_get_traces_route_forwards_filters(monitoring_client):
app, client = monitoring_client
response = await client.get(
'/api/v1/monitoring/traces'
'?botId=bot-1'
'&pipelineId=pipeline-1'
'&sessionId=session-1'
'&status=success'
'&startTime=2026-01-01T00:00:00Z'
'&endTime=2026-01-02T00:00:00Z'
'&limit=25'
'&offset=5',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 200
data = await response.get_json()
assert data['data'] == {
'traces': [{'trace_id': 'trace-1'}],
'total': 1,
'limit': 25,
'offset': 5,
}
app.monitoring_service.get_traces.assert_awaited_once_with(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
start_time=datetime.datetime(2026, 1, 1, 0, 0),
end_time=datetime.datetime(2026, 1, 2, 0, 0),
limit=25,
offset=5,
)
async def test_get_trace_details_route_returns_404_for_missing_trace(monitoring_client):
_app, client = monitoring_client
response = await client.get(
'/api/v1/monitoring/traces/trace-missing',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 404
data = await response.get_json()
assert data['code'] == -1
assert data['msg'] == 'Trace trace-missing not found'
@@ -0,0 +1,87 @@
"""Unit tests for the monitoring trace Alembic migration."""
from __future__ import annotations
from importlib import import_module
class _FakeInspector:
def __init__(self, tables):
self._tables = tables
def get_table_names(self):
return list(self._tables)
class _FakeOp:
def __init__(self):
self.created_tables = []
self.created_indexes = []
self.dropped_tables = []
def get_bind(self):
return object()
def create_table(self, table_name, *columns):
self.created_tables.append((table_name, columns))
def create_index(self, index_name, table_name, columns):
self.created_indexes.append((index_name, table_name, columns))
def drop_table(self, table_name):
self.dropped_tables.append(table_name)
def _migration_module():
return import_module('langbot.pkg.persistence.alembic.versions.0006_monitoring_traces')
def test_upgrade_creates_monitoring_trace_tables_and_indexes(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(migration.sa, 'inspect', lambda _conn: _FakeInspector(tables=set()))
migration.upgrade()
assert [table_name for table_name, _columns in fake_op.created_tables] == [
'monitoring_traces',
'monitoring_spans',
]
assert ('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) in fake_op.created_indexes
assert ('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) in fake_op.created_indexes
assert ('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) in fake_op.created_indexes
def test_upgrade_skips_existing_monitoring_trace_tables(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(
migration.sa,
'inspect',
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
)
migration.upgrade()
assert fake_op.created_tables == []
assert fake_op.created_indexes == []
def test_downgrade_drops_spans_before_traces(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(
migration.sa,
'inspect',
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
)
migration.downgrade()
assert fake_op.dropped_tables == ['monitoring_spans', 'monitoring_traces']
@@ -162,3 +162,61 @@ async def test_runtime_pipeline_execute(mock_app, sample_query):
# Verify stage was called # Verify stage was called
mock_stage.process.assert_called_once() mock_stage.process.assert_called_once()
@pytest.mark.asyncio
async def test_runtime_pipeline_marks_trace_error_when_stage_returns_error_notice(mock_app, sample_query):
"""Trace status follows handled stage errors, not only raised exceptions."""
pipelinemgr = get_pipelinemgr_module()
stage = get_stage_module()
persistence_pipeline = get_persistence_pipeline_module()
entities = get_entities_module()
error_result = entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,
new_query=sample_query,
user_notice='',
console_notice='',
debug_notice='traceback',
error_notice='model request failed',
)
mock_stage = Mock(spec=stage.PipelineStage)
mock_stage.process = AsyncMock(return_value=error_result)
stage_container = pipelinemgr.StageInstContainer(inst_name='FailingStage', inst=mock_stage)
pipeline_entity = Mock(spec=persistence_pipeline.LegacyPipeline)
pipeline_entity.uuid = 'test-pipeline-uuid'
pipeline_entity.name = 'Test Pipeline'
pipeline_entity.config = sample_query.pipeline_config
pipeline_entity.extensions_preferences = {'plugins': []}
mock_app.bot_service = AsyncMock()
mock_app.bot_service.get_bot = AsyncMock(return_value={'name': 'Test Bot'})
mock_app.monitoring_service = AsyncMock()
mock_app.monitoring_service.record_message = AsyncMock(return_value='message-1')
mock_app.monitoring_service.update_session_activity = AsyncMock(return_value=True)
mock_app.monitoring_service.start_trace = AsyncMock(return_value='trace-1')
mock_app.monitoring_service.record_span = AsyncMock()
mock_app.monitoring_service.finish_trace = AsyncMock()
mock_app.monitoring_service.update_message_status = AsyncMock()
mock_app.monitoring_service.record_error = AsyncMock()
event_ctx = Mock()
event_ctx.is_prevented_default = Mock(return_value=False)
mock_app.plugin_connector.emit_event = AsyncMock(return_value=event_ctx)
mock_app.query_pool.cached_queries[sample_query.query_id] = sample_query
runtime_pipeline = pipelinemgr.RuntimePipeline(mock_app, pipeline_entity, [stage_container])
await runtime_pipeline.run(sample_query)
mock_app.monitoring_service.finish_trace.assert_awaited_once()
assert mock_app.monitoring_service.finish_trace.await_args.kwargs['status'] == 'error'
span_calls = mock_app.monitoring_service.record_span.await_args_list
stage_span_call = next(call for call in span_calls if call.kwargs['name'] == 'FailingStage')
root_span_call = next(call for call in span_calls if call.kwargs['kind'] == 'pipeline.query')
assert stage_span_call.kwargs['status'] == 'error'
assert stage_span_call.kwargs['error_message'] == 'model request failed'
assert root_span_call.kwargs['status'] == 'error'
+26
View File
@@ -407,6 +407,32 @@ class TestRuntimeKnowledgeBaseRetrieve:
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
assert call_args[0][1]['retrieval_settings']['top_k'] == 5 assert call_args[0][1]['retrieval_settings']['top_k'] == 5
@pytest.mark.asyncio
async def test_retrieve_records_host_rag_duration(self, monkeypatch):
"""Test host RAG span duration is measured even if plugin omits it."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.monitoring_service = AsyncMock()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
return_value={'results': [], 'metadata': {'status': 'success'}}
)
monkeypatch.setattr(rag_module.time, 'perf_counter', Mock(side_effect=[10.0, 10.25]))
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb._retrieve(
'query text',
{
'_trace_context': {
'trace_id': 'trace-1',
'parent_span_id': 'span-root',
}
},
)
assert mock_app.monitoring_service.record_span.await_args.kwargs['duration'] == 250
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retrieve_converts_dict_to_entry(self): async def test_retrieve_converts_dict_to_entry(self):
"""Test that dict results are converted to RetrievalResultEntry.""" """Test that dict results are converted to RetrievalResultEntry."""
@@ -5,6 +5,7 @@ import {
ModelCall, ModelCall,
LLMCall, LLMCall,
EmbeddingCall, EmbeddingCall,
MonitoringTrace,
} from '../types/monitoring'; } from '../types/monitoring';
import { backendClient } from '@/app/infra/http'; import { backendClient } from '@/app/infra/http';
import { parseUTCTimestamp } from '../utils/dateUtils'; import { parseUTCTimestamp } from '../utils/dateUtils';
@@ -263,12 +264,48 @@ export function useMonitoringData(filterState: FilterState) {
messageId: error.message_id, messageId: error.message_id,
}), }),
), ),
traces: (response.traces || []).map(
(trace: {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}): MonitoringTrace => ({
traceId: trace.trace_id,
name: trace.name,
startedAt: parseUTCTimestamp(trace.started_at),
endedAt: trace.ended_at
? parseUTCTimestamp(trace.ended_at)
: undefined,
duration: trace.duration,
status: trace.status as 'running' | 'success' | 'error',
botId: trace.bot_id,
botName: trace.bot_name,
pipelineId: trace.pipeline_id,
pipelineName: trace.pipeline_name,
sessionId: trace.session_id,
messageId: trace.message_id,
queryId: trace.query_id,
attributes: trace.attributes || {},
}),
),
totalCount: { totalCount: {
messages: response.totalCount.messages, messages: response.totalCount.messages,
llmCalls: response.totalCount.llmCalls, llmCalls: response.totalCount.llmCalls,
embeddingCalls: response.totalCount.embeddingCalls || 0, embeddingCalls: response.totalCount.embeddingCalls || 0,
sessions: response.totalCount.sessions, sessions: response.totalCount.sessions,
errors: response.totalCount.errors, errors: response.totalCount.errors,
traces: response.totalCount.traces || 0,
}, },
}; };
+297 -1
View File
@@ -10,6 +10,7 @@ import {
MessageSquare, MessageSquare,
Sparkles, Sparkles,
CheckCircle2, CheckCircle2,
GitBranch,
} from 'lucide-react'; } from 'lucide-react';
import OverviewCards from './components/overview-cards/OverviewCards'; import OverviewCards from './components/overview-cards/OverviewCards';
import MonitoringFilters from './components/filters/MonitoringFilters'; import MonitoringFilters from './components/filters/MonitoringFilters';
@@ -22,9 +23,15 @@ import { MessageDetailsCard } from './components/MessageDetailsCard';
import { MessageContentRenderer } from './components/MessageContentRenderer'; import { MessageContentRenderer } from './components/MessageContentRenderer';
import { FeedbackStatsCards } from './components/FeedbackCard'; import { FeedbackStatsCards } from './components/FeedbackCard';
import { FeedbackList } from './components/FeedbackList'; import { FeedbackList } from './components/FeedbackList';
import { MessageDetails } from './types/monitoring'; import {
MessageDetails,
TraceDetails,
MonitoringSpan,
} from './types/monitoring';
import { httpClient } from '@/app/infra/http/HttpClient'; import { httpClient } from '@/app/infra/http/HttpClient';
import { backendClient } from '@/app/infra/http';
import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner'; import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner';
import { parseUTCTimestamp } from './utils/dateUtils';
interface RawMessageData { interface RawMessageData {
id: string; id: string;
@@ -72,6 +79,97 @@ interface RawErrorData {
stack_trace: string | null; stack_trace: string | null;
} }
interface RawTraceData {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}
interface RawSpanData {
span_id: string;
trace_id: string;
parent_span_id?: string;
name: string;
kind: string;
status: string;
started_at: string;
ended_at?: string;
duration?: number;
message_id?: string;
session_id?: string;
bot_id?: string;
pipeline_id?: string;
attributes?: Record<string, unknown>;
error_message?: string;
}
function mapTrace(raw: RawTraceData) {
return {
traceId: raw.trace_id,
name: raw.name,
startedAt: parseUTCTimestamp(raw.started_at),
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
duration: raw.duration,
status: raw.status as 'running' | 'success' | 'error',
botId: raw.bot_id,
botName: raw.bot_name,
pipelineId: raw.pipeline_id,
pipelineName: raw.pipeline_name,
sessionId: raw.session_id,
messageId: raw.message_id,
queryId: raw.query_id,
attributes: raw.attributes || {},
};
}
function mapSpan(raw: RawSpanData): MonitoringSpan {
return {
spanId: raw.span_id,
traceId: raw.trace_id,
parentSpanId: raw.parent_span_id,
name: raw.name,
kind: raw.kind,
status: raw.status as 'running' | 'success' | 'error',
startedAt: parseUTCTimestamp(raw.started_at),
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
duration: raw.duration,
messageId: raw.message_id,
sessionId: raw.session_id,
botId: raw.bot_id,
pipelineId: raw.pipeline_id,
attributes: raw.attributes || {},
errorMessage: raw.error_message,
};
}
function spanDepth(
span: MonitoringSpan,
spansById: Map<string, MonitoringSpan>,
) {
let depth = 0;
let current = span.parentSpanId
? spansById.get(span.parentSpanId)
: undefined;
while (current && depth < 8) {
depth += 1;
current = current.parentSpanId
? spansById.get(current.parentSpanId)
: undefined;
}
return depth;
}
function MonitoringPageContent() { function MonitoringPageContent() {
const { t } = useTranslation(); const { t } = useTranslation();
const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } = const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } =
@@ -158,6 +256,13 @@ function MonitoringPageContent() {
// State for expanded errors // State for expanded errors
const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null); const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null);
const [expandedTraceId, setExpandedTraceId] = useState<string | null>(null);
const [traceDetails, setTraceDetails] = useState<
Record<string, TraceDetails>
>({});
const [loadingTraceDetails, setLoadingTraceDetails] = useState<
Record<string, boolean>
>({});
// State for controlled tabs // State for controlled tabs
const [activeTab, setActiveTab] = useState<string>('messages'); const [activeTab, setActiveTab] = useState<string>('messages');
@@ -265,6 +370,34 @@ function MonitoringPageContent() {
} }
}; };
const toggleTraceExpand = async (traceId: string) => {
if (expandedTraceId === traceId) {
setExpandedTraceId(null);
return;
}
setExpandedTraceId(traceId);
if (traceDetails[traceId]) return;
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: true }));
try {
const result = await backendClient.getMonitoringTraceDetails(traceId);
setTraceDetails((prev) => ({
...prev,
[traceId]: {
traceId: result.trace_id,
found: result.found,
trace: result.trace ? mapTrace(result.trace) : undefined,
spans: (result.spans || []).map(mapSpan),
},
}));
} catch (error) {
console.error('Failed to fetch trace details:', error);
} finally {
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: false }));
}
};
return ( return (
<div className="w-full h-full overflow-y-auto overflow-x-hidden"> <div className="w-full h-full overflow-y-auto overflow-x-hidden">
{/* Filters and Refresh Button - Sticky */} {/* Filters and Refresh Button - Sticky */}
@@ -323,6 +456,9 @@ function MonitoringPageContent() {
<TabsTrigger value="tokens" className="px-6 py-2"> <TabsTrigger value="tokens" className="px-6 py-2">
{t('monitoring.tabs.tokens')} {t('monitoring.tabs.tokens')}
</TabsTrigger> </TabsTrigger>
<TabsTrigger value="traces" className="px-6 py-2">
{t('monitoring.tabs.traces')}
</TabsTrigger>
<TabsTrigger value="feedback" className="px-6 py-2"> <TabsTrigger value="feedback" className="px-6 py-2">
{t('monitoring.tabs.feedback')} {t('monitoring.tabs.feedback')}
</TabsTrigger> </TabsTrigger>
@@ -690,6 +826,166 @@ function MonitoringPageContent() {
/> />
</TabsContent> </TabsContent>
<TabsContent value="traces" className="p-6 m-0">
<div>
{loading && (
<div className="py-12 flex justify-center">
<LoadingSpinner text={t('common.loading')} />
</div>
)}
{!loading && data && data.traces && data.traces.length > 0 && (
<div className="space-y-4">
{data.traces.map((trace) => {
const details = traceDetails[trace.traceId];
const spans = details?.spans || [];
const spansById = new Map(
spans.map((span) => [span.spanId, span]),
);
const maxDuration = Math.max(
1,
...spans.map((span) => span.duration || 0),
);
return (
<div
key={trace.traceId}
className="border rounded-xl overflow-hidden transition-all duration-200"
>
<div
className="p-5 cursor-pointer hover:bg-accent transition-colors"
onClick={() => toggleTraceExpand(trace.traceId)}
>
<div className="flex items-start justify-between gap-4">
<div className="flex items-start flex-1 min-w-0">
<div className="mr-3 mt-0.5">
{expandedTraceId === trace.traceId ? (
<ChevronDown className="w-5 h-5 text-muted-foreground" />
) : (
<ChevronRight className="w-5 h-5 text-muted-foreground" />
)}
</div>
<div className="min-w-0 flex-1">
<div className="flex flex-wrap items-center gap-2 mb-2">
<span className="text-xs text-muted-foreground font-mono">
{trace.traceId}
</span>
<span
className={`text-xs px-2 py-1 rounded ${
trace.status === 'error'
? 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-200'
: trace.status === 'running'
? 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-200'
: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200'
}`}
>
{trace.status}
</span>
</div>
<div className="font-medium text-sm text-foreground mb-1">
{trace.name}
</div>
<div className="text-xs text-muted-foreground truncate">
{trace.botName || '-'} {' '}
{trace.pipelineName || '-'}
{trace.sessionId
? ` · ${trace.sessionId}`
: ''}
</div>
</div>
</div>
<div className="flex flex-col items-end gap-1 text-xs text-muted-foreground whitespace-nowrap">
<span>{trace.startedAt.toLocaleString()}</span>
<span>{trace.duration ?? 0}ms</span>
</div>
</div>
</div>
{expandedTraceId === trace.traceId && (
<div className="border-t p-5 bg-muted">
{loadingTraceDetails[trace.traceId] && (
<div className="py-4 flex justify-center">
<LoadingSpinner size="sm" text="" />
</div>
)}
{!loadingTraceDetails[trace.traceId] && (
<div className="space-y-3">
{spans.length === 0 && (
<div className="text-sm text-muted-foreground">
{t('monitoring.traces.noSpans')}
</div>
)}
{spans.map((span) => {
const depth = spanDepth(span, spansById);
const width = Math.max(
6,
Math.min(
100,
((span.duration || 0) / maxDuration) *
100,
),
);
return (
<div
key={span.spanId}
className="grid grid-cols-[minmax(180px,1fr)_minmax(140px,2fr)_80px] gap-3 items-center text-xs"
>
<div
className="min-w-0"
style={{
paddingLeft: `${depth * 16}px`,
}}
>
<div className="font-medium text-foreground truncate">
{span.name}
</div>
<div className="text-muted-foreground truncate">
{span.kind}
</div>
</div>
<div className="h-7 bg-background rounded border overflow-hidden">
<div
className={`h-full ${
span.status === 'error'
? 'bg-red-500/70'
: 'bg-blue-500/70'
}`}
style={{ width: `${width}%` }}
/>
</div>
<div className="text-right text-muted-foreground">
{span.duration ?? 0}ms
</div>
{span.errorMessage && (
<div className="col-span-3 text-red-600 dark:text-red-400 bg-background rounded p-2">
{span.errorMessage}
</div>
)}
</div>
);
})}
</div>
)}
</div>
)}
</div>
);
})}
</div>
)}
{!loading &&
(!data || !data.traces || data.traces.length === 0) && (
<div className="flex flex-col items-center justify-center text-muted-foreground py-16 gap-2">
<GitBranch className="h-[3rem] w-[3rem]" />
<div className="text-sm">
{t('monitoring.traces.noTraces')}
</div>
</div>
)}
</div>
</TabsContent>
<TabsContent value="feedback" className="p-6 m-0"> <TabsContent value="feedback" className="p-6 m-0">
<div> <div>
{loading && ( {loading && (
@@ -111,6 +111,48 @@ export interface ErrorLog {
messageId?: string; messageId?: string;
} }
export interface MonitoringTrace {
traceId: string;
name: string;
startedAt: Date;
endedAt?: Date;
duration?: number;
status: 'running' | 'success' | 'error';
botId?: string;
botName?: string;
pipelineId?: string;
pipelineName?: string;
sessionId?: string;
messageId?: string;
queryId?: string;
attributes: Record<string, unknown>;
}
export interface MonitoringSpan {
spanId: string;
traceId: string;
parentSpanId?: string;
name: string;
kind: string;
status: 'success' | 'error' | 'running';
startedAt: Date;
endedAt?: Date;
duration?: number;
messageId?: string;
sessionId?: string;
botId?: string;
pipelineId?: string;
attributes: Record<string, unknown>;
errorMessage?: string;
}
export interface TraceDetails {
traceId: string;
found: boolean;
trace?: MonitoringTrace;
spans: MonitoringSpan[];
}
export interface MessageDetails { export interface MessageDetails {
messageId: string; messageId: string;
found: boolean; found: boolean;
@@ -125,6 +167,7 @@ export interface MessageDetails {
averageDurationMs: number; averageDurationMs: number;
}; };
errors: ErrorLog[]; errors: ErrorLog[];
trace?: MonitoringTrace;
} }
export interface OverviewMetrics { export interface OverviewMetrics {
@@ -203,6 +246,7 @@ export interface MonitoringData {
modelCalls: ModelCall[]; modelCalls: ModelCall[];
sessions: SessionInfo[]; sessions: SessionInfo[];
errors: ErrorLog[]; errors: ErrorLog[];
traces: MonitoringTrace[];
feedback?: FeedbackRecord[]; feedback?: FeedbackRecord[];
feedbackStats?: FeedbackStats; feedbackStats?: FeedbackStats;
totalCount: { totalCount: {
@@ -211,6 +255,7 @@ export interface MonitoringData {
embeddingCalls: number; embeddingCalls: number;
sessions: number; sessions: number;
errors: number; errors: number;
traces: number;
feedback?: number; feedback?: number;
}; };
} }
+101
View File
@@ -1185,12 +1185,29 @@ export class BackendClient extends BaseHttpClient {
stack_trace?: string; stack_trace?: string;
message_id?: string; message_id?: string;
}>; }>;
traces?: Array<{
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}>;
totalCount: { totalCount: {
messages: number; messages: number;
llmCalls: number; llmCalls: number;
embeddingCalls: number; embeddingCalls: number;
sessions: number; sessions: number;
errors: number; errors: number;
traces?: number;
}; };
}> { }> {
const queryParams = new URLSearchParams(); const queryParams = new URLSearchParams();
@@ -1213,6 +1230,90 @@ export class BackendClient extends BaseHttpClient {
return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`); return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`);
} }
public getMonitoringTraces(params: {
botId?: string[];
pipelineId?: string[];
startTime?: string;
endTime?: string;
limit?: number;
}): Promise<{
traces: Array<{
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}>;
total: number;
}> {
const queryParams = new URLSearchParams();
if (params.botId) {
params.botId.forEach((id) => queryParams.append('botId', id));
}
if (params.pipelineId) {
params.pipelineId.forEach((id) => queryParams.append('pipelineId', id));
}
if (params.startTime) {
queryParams.append('startTime', params.startTime);
}
if (params.endTime) {
queryParams.append('endTime', params.endTime);
}
if (params.limit) {
queryParams.append('limit', params.limit.toString());
}
return this.get(`/api/v1/monitoring/traces?${queryParams.toString()}`);
}
public getMonitoringTraceDetails(traceId: string): Promise<{
trace_id: string;
found: boolean;
trace: {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
};
spans: Array<{
span_id: string;
trace_id: string;
parent_span_id?: string;
name: string;
kind: string;
status: string;
started_at: string;
ended_at?: string;
duration?: number;
message_id?: string;
session_id?: string;
bot_id?: string;
pipeline_id?: string;
attributes?: Record<string, unknown>;
error_message?: string;
}>;
}> {
return this.get(`/api/v1/monitoring/traces/${traceId}`);
}
public getMonitoringOverview(params: { public getMonitoringOverview(params: {
botId?: string[]; botId?: string[];
pipelineId?: string[]; pipelineId?: string[];
+6
View File
@@ -1217,6 +1217,7 @@ const enUS = {
embeddingCalls: 'Embedding Calls', embeddingCalls: 'Embedding Calls',
modelCalls: 'Model Calls', modelCalls: 'Model Calls',
tokens: 'Token Monitoring', tokens: 'Token Monitoring',
traces: 'Traces',
feedback: 'User Feedback', feedback: 'User Feedback',
sessions: 'Session Analysis', sessions: 'Session Analysis',
errors: 'Error Logs', errors: 'Error Logs',
@@ -1321,6 +1322,11 @@ const enUS = {
noErrors: 'No errors found', noErrors: 'No errors found',
stackTrace: 'Stack Trace', stackTrace: 'Stack Trace',
}, },
traces: {
title: 'Traces',
noTraces: 'No traces found',
noSpans: 'No spans recorded for this trace',
},
feedback: { feedback: {
title: 'User Feedback', title: 'User Feedback',
totalFeedback: 'Total Feedback', totalFeedback: 'Total Feedback',
+6
View File
@@ -1158,6 +1158,7 @@ const zhHans = {
embeddingCalls: 'Embedding调用', embeddingCalls: 'Embedding调用',
modelCalls: '模型调用', modelCalls: '模型调用',
tokens: 'Token 监控', tokens: 'Token 监控',
traces: '链路追踪',
feedback: '用户反馈', feedback: '用户反馈',
sessions: '会话分析', sessions: '会话分析',
errors: '错误日志', errors: '错误日志',
@@ -1262,6 +1263,11 @@ const zhHans = {
noErrors: '未找到错误', noErrors: '未找到错误',
stackTrace: '堆栈追踪', stackTrace: '堆栈追踪',
}, },
traces: {
title: '链路追踪',
noTraces: '未找到链路记录',
noSpans: '此链路暂无 Span 记录',
},
feedback: { feedback: {
title: '用户反馈', title: '用户反馈',
totalFeedback: '总反馈数', totalFeedback: '总反馈数',