diff --git a/src/langbot/pkg/api/http/controller/groups/monitoring.py b/src/langbot/pkg/api/http/controller/groups/monitoring.py index 65640b6e..43c15fdc 100644 --- a/src/langbot/pkg/api/http/controller/groups/monitoring.py +++ b/src/langbot/pkg/api/http/controller/groups/monitoring.py @@ -313,18 +313,30 @@ class MonitoringRouterGroup(group.RouterGroup): offset=0, ) + # Get traces + traces, traces_total = await self.ap.monitoring_service.get_traces( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + start_time=start_time, + end_time=end_time, + limit=limit, + offset=0, + ) + return self.success( data={ 'overview': overview, 'messages': messages, 'llmCalls': llm_calls, 'embeddingCalls': embedding_calls, + 'traces': traces, 'sessions': sessions, 'errors': errors, 'totalCount': { 'messages': messages_total, 'llmCalls': llm_calls_total, 'embeddingCalls': embedding_calls_total, + 'traces': traces_total, 'sessions': sessions_total, 'errors': errors_total, }, @@ -350,6 +362,49 @@ class MonitoringRouterGroup(group.RouterGroup): return self.success(data=details) + @self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_traces() -> str: + """Get end-to-end trace records.""" + bot_ids = quart.request.args.getlist('botId') + pipeline_ids = quart.request.args.getlist('pipelineId') + session_ids = quart.request.args.getlist('sessionId') + statuses = quart.request.args.getlist('status') + start_time_str = quart.request.args.get('startTime') + end_time_str = quart.request.args.get('endTime') + limit = int(quart.request.args.get('limit', 100)) + offset = int(quart.request.args.get('offset', 0)) + + start_time = parse_iso_datetime(start_time_str) + end_time = parse_iso_datetime(end_time_str) + + traces, total = await self.ap.monitoring_service.get_traces( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + session_ids=session_ids if session_ids else None, + statuses=statuses if statuses else None, + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset, + ) + + return self.success( + data={ + 'traces': traces, + 'total': total, + 'limit': limit, + 'offset': offset, + } + ) + + @self.route('/traces/', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_trace_details(trace_id: str) -> str: + """Get one trace with all spans.""" + details = await self.ap.monitoring_service.get_trace_details(trace_id) + if not details.get('found'): + return self.http_status(404, -1, f'Trace {trace_id} not found') + return self.success(data=details) + @self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def export_data() -> tuple[str, int]: """Export monitoring data as CSV""" diff --git a/src/langbot/pkg/api/http/controller/groups/plugins.py b/src/langbot/pkg/api/http/controller/groups/plugins.py index c291c123..7b27acc0 100644 --- a/src/langbot/pkg/api/http/controller/groups/plugins.py +++ b/src/langbot/pkg/api/http/controller/groups/plugins.py @@ -350,8 +350,24 @@ class PluginsRouterGroup(group.RouterGroup): if not endpoint.startswith('/') or '..' in endpoint: return self.http_status(400, -1, 'invalid endpoint') + caller = { + 'plugin_author': author, + 'plugin_name': plugin_name, + 'page_id': page_id, + 'origin': _get_request_origin(), + } + headers = { + key: value + for key, value in { + 'user-agent': quart.request.headers.get('User-Agent'), + 'x-request-id': quart.request.headers.get('X-Request-ID'), + 'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'), + }.items() + if value + } + result = await self.ap.plugin_connector.handle_page_api( - author, plugin_name, page_id, endpoint, method.upper(), body + author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers ) if result.get('error'): return self.http_status(400, -1, result['error']) diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index 3e8e0cde..68cd2897 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -3,11 +3,53 @@ from __future__ import annotations import uuid import datetime import sqlalchemy +import json from ....core import app from ....entity.persistence import monitoring as persistence_monitoring +def _utc_now() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + +def _json_dumps(value: dict | list | None) -> str | None: + if value is None: + return None + try: + return json.dumps(value, ensure_ascii=False, default=str) + except Exception: + return json.dumps({'serialization_error': str(value)}, ensure_ascii=False) + + +def _json_loads(value: str | None) -> dict | list | None: + if not value: + return None + try: + return json.loads(value) + except Exception: + return None + + +def new_trace_id() -> str: + return f'trace-{uuid.uuid4().hex[:16]}' + + +def new_span_id() -> str: + return f'span-{uuid.uuid4().hex[:16]}' + + +def normalize_trace_status(status: str | None) -> str: + """Normalize operation status to the monitoring UI vocabulary.""" + if status in ('completed', 'ok'): + return 'success' + if status in ('failed', 'failure', 'exception'): + return 'error' + if status in ('running', 'success', 'error'): + return status + return 'success' + + class MonitoringService: """Monitoring service""" @@ -74,6 +116,18 @@ class MonitoringService: persistence_monitoring.MonitoringFeedback.timestamp, persistence_monitoring.MonitoringFeedback.id, ), + ( + 'monitoring_traces', + persistence_monitoring.MonitoringTrace, + persistence_monitoring.MonitoringTrace.started_at, + persistence_monitoring.MonitoringTrace.trace_id, + ), + ( + 'monitoring_spans', + persistence_monitoring.MonitoringSpan, + persistence_monitoring.MonitoringSpan.started_at, + persistence_monitoring.MonitoringSpan.span_id, + ), ] deleted_counts: dict[str, int] = {} @@ -133,6 +187,116 @@ class MonitoringService: # ========== Recording Methods ========== + async def start_trace( + self, + trace_id: str | None = None, + name: str = 'LangBot query', + bot_id: str | None = None, + bot_name: str | None = None, + pipeline_id: str | None = None, + pipeline_name: str | None = None, + session_id: str | None = None, + message_id: str | None = None, + query_id: str | int | None = None, + attributes: dict | None = None, + ) -> str: + """Create or update a trace header row.""" + trace_id = trace_id or new_trace_id() + trace_data = { + 'trace_id': trace_id, + 'started_at': _utc_now(), + 'ended_at': None, + 'duration': None, + 'status': 'running', + 'name': name, + 'bot_id': bot_id, + 'bot_name': bot_name, + 'pipeline_id': pipeline_id, + 'pipeline_name': pipeline_name, + 'session_id': session_id, + 'message_id': message_id, + 'query_id': str(query_id) if query_id is not None else None, + 'attributes': _json_dumps(attributes), + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data) + ) + return trace_id + + async def finish_trace( + self, + trace_id: str, + status: str = 'success', + duration: int | None = None, + message_id: str | None = None, + attributes: dict | None = None, + ) -> None: + """Mark a trace complete.""" + update_values: dict = { + 'ended_at': _utc_now(), + 'status': normalize_trace_status(status), + } + if duration is not None: + update_values['duration'] = duration + if message_id is not None: + update_values['message_id'] = message_id + if attributes is not None: + update_values['attributes'] = _json_dumps(attributes) + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(persistence_monitoring.MonitoringTrace) + .where(persistence_monitoring.MonitoringTrace.trace_id == trace_id) + .values(update_values) + ) + + async def record_span( + self, + trace_id: str, + name: str, + kind: str, + status: str = 'success', + span_id: str | None = None, + parent_span_id: str | None = None, + started_at: datetime.datetime | None = None, + ended_at: datetime.datetime | None = None, + duration: int | None = None, + message_id: str | None = None, + session_id: str | None = None, + bot_id: str | None = None, + pipeline_id: str | None = None, + attributes: dict | None = None, + error_message: str | None = None, + ) -> str: + """Record a single completed span.""" + started_at = started_at or _utc_now() + if duration is None and ended_at is not None: + duration = int((ended_at - started_at).total_seconds() * 1000) + elif duration is not None: + duration = int(round(float(duration))) + span_data = { + 'span_id': span_id or new_span_id(), + 'trace_id': trace_id, + 'parent_span_id': parent_span_id, + 'name': name, + 'kind': kind, + 'status': normalize_trace_status(status), + 'started_at': started_at, + 'ended_at': ended_at or _utc_now(), + 'duration': duration, + 'message_id': message_id, + 'session_id': session_id, + 'bot_id': bot_id, + 'pipeline_id': pipeline_id, + 'attributes': _json_dumps(attributes), + 'error_message': error_message, + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data) + ) + return span_data['span_id'] + async def record_message( self, bot_id: str, @@ -1076,6 +1240,19 @@ class MonitoringService: for row in error_rows ] + trace_query = ( + sqlalchemy.select(persistence_monitoring.MonitoringTrace) + .where(persistence_monitoring.MonitoringTrace.message_id == message_id) + .order_by(persistence_monitoring.MonitoringTrace.started_at.desc()) + .limit(1) + ) + trace_result = await self.ap.persistence_mgr.execute_async(trace_query) + trace_row = trace_result.first() + trace = None + if trace_row: + trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row + trace = self._serialize_trace(trace_model) + return { 'message_id': message_id, 'found': True, @@ -1090,6 +1267,90 @@ class MonitoringService: 'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0, }, 'errors': errors, + 'trace': trace, + } + + def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict: + data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace) + data['attributes'] = _json_loads(data.get('attributes')) or {} + return data + + def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict: + data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span) + data['attributes'] = _json_loads(data.get('attributes')) or {} + return data + + async def get_traces( + self, + bot_ids: list[str] | None = None, + pipeline_ids: list[str] | None = None, + session_ids: list[str] | None = None, + statuses: list[str] | None = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, + limit: int = 100, + offset: int = 0, + ) -> tuple[list[dict], int]: + """Get trace headers with filters.""" + conditions = [] + if bot_ids: + conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids)) + if pipeline_ids: + conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids)) + if session_ids: + conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids)) + if statuses: + conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses)) + if start_time: + conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time) + if end_time: + conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time) + + count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id)) + query = sqlalchemy.select(persistence_monitoring.MonitoringTrace) + if conditions: + clause = sqlalchemy.and_(*conditions) + count_query = count_query.where(clause) + query = query.where(clause) + + total_result = await self.ap.persistence_mgr.execute_async(count_query) + total = total_result.scalar() or 0 + + query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset) + result = await self.ap.persistence_mgr.execute_async(query) + traces = [ + self._serialize_trace(row[0] if isinstance(row, tuple) else row) + for row in result.all() + ] + return traces, total + + async def get_trace_details(self, trace_id: str) -> dict: + """Get a single trace and all spans in chronological order.""" + trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where( + persistence_monitoring.MonitoringTrace.trace_id == trace_id + ) + trace_result = await self.ap.persistence_mgr.execute_async(trace_query) + trace_row = trace_result.first() + if not trace_row: + return {'trace_id': trace_id, 'found': False} + + trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row + span_query = ( + sqlalchemy.select(persistence_monitoring.MonitoringSpan) + .where(persistence_monitoring.MonitoringSpan.trace_id == trace_id) + .order_by(persistence_monitoring.MonitoringSpan.started_at.asc()) + ) + span_result = await self.ap.persistence_mgr.execute_async(span_query) + spans = [ + self._serialize_span(row[0] if isinstance(row, tuple) else row) + for row in span_result.all() + ] + + return { + 'trace_id': trace_id, + 'found': True, + 'trace': self._serialize_trace(trace), + 'spans': spans, } # ========== Export Methods ========== diff --git a/src/langbot/pkg/entity/persistence/monitoring.py b/src/langbot/pkg/entity/persistence/monitoring.py index 01e4fdd3..13b3de82 100644 --- a/src/langbot/pkg/entity/persistence/monitoring.py +++ b/src/langbot/pkg/entity/persistence/monitoring.py @@ -3,6 +3,49 @@ import sqlalchemy from .base import Base +class MonitoringTrace(Base): + """End-to-end monitoring trace records""" + + __tablename__ = 'monitoring_traces' + + trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True) + ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True) + duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds + status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error + name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + + +class MonitoringSpan(Base): + """Trace span records for pipeline, RAG, model, plugin and tool operations""" + + __tablename__ = 'monitoring_spans' + + span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) + parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True) + status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) + started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True) + ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True) + duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds + message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + + class MonitoringMessage(Base): """Monitoring message records""" diff --git a/src/langbot/pkg/persistence/alembic/versions/0006_monitoring_traces.py b/src/langbot/pkg/persistence/alembic/versions/0006_monitoring_traces.py new file mode 100644 index 00000000..fb2dd6e9 --- /dev/null +++ b/src/langbot/pkg/persistence/alembic/versions/0006_monitoring_traces.py @@ -0,0 +1,88 @@ +"""add monitoring traces and spans + +Revision ID: 0006_monitoring_traces +Revises: 0005_add_llm_context_length +Create Date: 2026-06-16 +""" + +import sqlalchemy as sa +from alembic import op + +revision = '0006_monitoring_traces' +down_revision = '0005_add_llm_context_length' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + tables = set(inspector.get_table_names()) + + if 'monitoring_traces' not in tables: + op.create_table( + 'monitoring_traces', + sa.Column('trace_id', sa.String(length=255), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=False), + sa.Column('ended_at', sa.DateTime(), nullable=True), + sa.Column('duration', sa.Integer(), nullable=True), + sa.Column('status', sa.String(length=50), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('bot_id', sa.String(length=255), nullable=True), + sa.Column('bot_name', sa.String(length=255), nullable=True), + sa.Column('pipeline_id', sa.String(length=255), nullable=True), + sa.Column('pipeline_name', sa.String(length=255), nullable=True), + sa.Column('session_id', sa.String(length=255), nullable=True), + sa.Column('message_id', sa.String(length=255), nullable=True), + sa.Column('query_id', sa.String(length=255), nullable=True), + sa.Column('attributes', sa.Text(), nullable=True), + sa.PrimaryKeyConstraint('trace_id'), + ) + op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) + op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at']) + op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status']) + op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id']) + op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id']) + op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id']) + op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id']) + op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id']) + + if 'monitoring_spans' not in tables: + op.create_table( + 'monitoring_spans', + sa.Column('span_id', sa.String(length=255), nullable=False), + sa.Column('trace_id', sa.String(length=255), nullable=False), + sa.Column('parent_span_id', sa.String(length=255), nullable=True), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('kind', sa.String(length=80), nullable=False), + sa.Column('status', sa.String(length=50), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=False), + sa.Column('ended_at', sa.DateTime(), nullable=True), + sa.Column('duration', sa.Integer(), nullable=True), + sa.Column('message_id', sa.String(length=255), nullable=True), + sa.Column('session_id', sa.String(length=255), nullable=True), + sa.Column('bot_id', sa.String(length=255), nullable=True), + sa.Column('pipeline_id', sa.String(length=255), nullable=True), + sa.Column('attributes', sa.Text(), nullable=True), + sa.Column('error_message', sa.Text(), nullable=True), + sa.PrimaryKeyConstraint('span_id'), + ) + op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) + op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id']) + op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind']) + op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status']) + op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at']) + op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id']) + op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id']) + op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id']) + op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) + + +def downgrade() -> None: + conn = op.get_bind() + inspector = sa.inspect(conn) + tables = set(inspector.get_table_names()) + if 'monitoring_spans' in tables: + op.drop_table('monitoring_spans') + if 'monitoring_traces' in tables: + op.drop_table('monitoring_traces') diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 1426fe3d..d08e411d 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -2,6 +2,9 @@ from __future__ import annotations import typing import traceback +import time +import uuid +import datetime import sqlalchemy @@ -79,6 +82,19 @@ class RuntimePipeline: enable_all_plugins: bool """是否启用所有插件""" + @staticmethod + def _new_span_id() -> str: + return f'span-{uuid.uuid4().hex[:16]}' + + @staticmethod + def _utc_now() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + @staticmethod + def _query_session_id(query: pipeline_query.Query) -> str: + launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type) + return f'{launcher_type}_{query.launcher_id}' + enable_all_mcp_servers: bool """是否启用所有MCP服务器""" @@ -234,44 +250,92 @@ class RuntimePipeline: stage_container = self.stage_containers[i] query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里 + span_started_at = self._utc_now() + span_started = time.perf_counter() + span_status = 'success' + span_error = None + span_result_type = None - result = stage_container.inst.process(query, stage_container.inst_name) + try: + result = stage_container.inst.process(query, stage_container.inst_name) - if isinstance(result, typing.Coroutine): - result = await result + if isinstance(result, typing.Coroutine): + result = await result - if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果 - self.ap.logger.debug( - f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' - ) - await self._check_output(query, result) - - if result.result_type == pipeline_entities.ResultType.INTERRUPT: - self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') - break - elif result.result_type == pipeline_entities.ResultType.CONTINUE: - query = result.new_query - elif isinstance(result, typing.AsyncGenerator): # 生成器 - self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen') - - async for sub_result in result: + if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果 + span_result_type = str(result.result_type.value if hasattr(result.result_type, 'value') else result.result_type) self.ap.logger.debug( - f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}' + f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' ) - await self._check_output(query, sub_result) + await self._check_output(query, result) - if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: + if result.result_type == pipeline_entities.ResultType.INTERRUPT: self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') break - elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE: - query = sub_result.new_query - await self._execute_from_stage(i + 1, query) - break + elif result.result_type == pipeline_entities.ResultType.CONTINUE: + query = result.new_query + elif isinstance(result, typing.AsyncGenerator): # 生成器 + span_result_type = 'generator' + self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen') + + async for sub_result in result: + span_result_type = str( + sub_result.result_type.value + if hasattr(sub_result.result_type, 'value') + else sub_result.result_type + ) + self.ap.logger.debug( + f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}' + ) + await self._check_output(query, sub_result) + + if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: + self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') + break + elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE: + query = sub_result.new_query + await self._execute_from_stage(i + 1, query) + break + except Exception as e: + span_status = 'error' + span_error = str(e) + raise + finally: + trace_id = (query.variables or {}).get('_monitoring_trace_id') + root_span_id = (query.variables or {}).get('_monitoring_root_span_id') + if trace_id: + try: + await self.ap.monitoring_service.record_span( + trace_id=trace_id, + parent_span_id=root_span_id, + name=stage_container.inst_name, + kind='pipeline.stage', + status=span_status, + started_at=span_started_at, + duration=int((time.perf_counter() - span_started) * 1000), + message_id=(query.variables or {}).get('_monitoring_message_id'), + session_id=self._query_session_id(query), + bot_id=query.bot_uuid, + pipeline_id=self.pipeline_entity.uuid, + attributes={ + 'stage_class': stage_container.inst.__class__.__name__, + 'result_type': span_result_type, + 'query_id': query.query_id, + }, + error_message=span_error, + ) + except Exception as monitor_err: + self.ap.logger.error(f'Failed to record stage span: {monitor_err}') i += 1 async def process_query(self, query: pipeline_query.Query): """处理请求""" + trace_started_at = self._utc_now() + trace_started = time.perf_counter() + root_span_id = self._new_span_id() + trace_id = None + trace_status = 'success' # Get monitoring metadata bot_name = query.variables.get('_monitoring_bot_name', 'Unknown') pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown') @@ -303,6 +367,28 @@ class RuntimePipeline: except Exception as e: self.ap.logger.error(f'Failed to record query start: {e}') + try: + trace_id = await self.ap.monitoring_service.start_trace( + name='LangBot query', + bot_id=query.bot_uuid or 'unknown', + bot_name=bot_name, + pipeline_id=self.pipeline_entity.uuid, + pipeline_name=pipeline_name, + session_id=self._query_session_id(query), + message_id=message_id or None, + query_id=query.query_id, + attributes={ + 'launcher_type': query.launcher_type.value + if hasattr(query.launcher_type, 'value') + else str(query.launcher_type), + 'runner_name': runner_name, + }, + ) + query.variables['_monitoring_trace_id'] = trace_id + query.variables['_monitoring_root_span_id'] = root_span_id + except Exception as e: + self.ap.logger.error(f'Failed to start query trace: {e}') + try: # Get bound plugins for this pipeline bound_plugins = query.variables.get('_pipeline_bound_plugins', None) @@ -361,6 +447,7 @@ class RuntimePipeline: self.ap.logger.error(f'Failed to record query response: {e}') except Exception as e: + trace_status = 'error' inst_name = query.current_stage_name if query.current_stage_name else 'unknown' self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}') self.ap.logger.error(f'Traceback: {traceback.format_exc()}') @@ -383,6 +470,35 @@ class RuntimePipeline: self.ap.logger.error(f'Failed to record query error: {me}') finally: + if trace_id: + try: + duration_ms = int((time.perf_counter() - trace_started) * 1000) + await self.ap.monitoring_service.record_span( + trace_id=trace_id, + span_id=root_span_id, + name='LangBot query', + kind='pipeline.query', + status=trace_status, + started_at=trace_started_at, + duration=duration_ms, + message_id=message_id or None, + session_id=self._query_session_id(query), + bot_id=query.bot_uuid, + pipeline_id=self.pipeline_entity.uuid, + attributes={ + 'query_id': query.query_id, + 'pipeline_name': pipeline_name, + 'runner_name': runner_name, + }, + ) + await self.ap.monitoring_service.finish_trace( + trace_id=trace_id, + status=trace_status, + duration=duration_ms, + message_id=message_id or None, + ) + except Exception as monitor_err: + self.ap.logger.error(f'Failed to finish query trace: {monitor_err}') self.ap.logger.debug(f'Query {query.query_id} processed') del self.ap.query_pool.cached_queries[query.query_id] diff --git a/src/langbot/pkg/plugin/connector.py b/src/langbot/pkg/plugin/connector.py index 12413e49..79e79c20 100644 --- a/src/langbot/pkg/plugin/connector.py +++ b/src/langbot/pkg/plugin/connector.py @@ -711,8 +711,19 @@ class PluginRuntimeConnector(ManagedRuntimeConnector): endpoint: str, method: str, body: Any = None, + caller: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, ) -> dict[str, Any]: - return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body) + return await self.handler.handle_page_api( + plugin_author, + plugin_name, + page_id, + endpoint, + method, + body, + caller, + headers or {}, + ) async def get_debug_info(self) -> dict[str, Any]: """Get debug information including debug key and WS URL""" diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 3bd85ae6..66a343bd 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -755,6 +755,19 @@ class RuntimeConnectionHandler(handler.Handler): 'session_name': session_name, 'bot_uuid': query.bot_uuid or '', 'sender_id': str(query.sender_id), + '_trace_context': { + 'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None, + 'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None, + 'message_id': query.variables.get('_monitoring_message_id') if query.variables else None, + 'query_id': query.query_id, + 'session_id': session_name, + 'bot_id': query.bot_uuid or '', + 'pipeline_id': query.pipeline_uuid or '', + 'knowledge_base_id': kb_id, + 'attributes': { + 'source': 'plugin-api', + }, + }, }, ) results = [entry.model_dump(mode='json') for entry in entries] @@ -1011,6 +1024,8 @@ class RuntimeConnectionHandler(handler.Handler): endpoint: str, method: str, body: Any = None, + caller: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, ) -> dict[str, Any]: """Forward a page API call to the plugin via runtime.""" result = await self.call_action( @@ -1022,6 +1037,8 @@ class RuntimeConnectionHandler(handler.Handler): 'endpoint': endpoint, 'method': method, 'body': body, + 'caller': caller, + 'headers': headers or {}, }, timeout=30, ) diff --git a/src/langbot/pkg/provider/modelmgr/requester.py b/src/langbot/pkg/provider/modelmgr/requester.py index 377f7d4a..9ea81b38 100644 --- a/src/langbot/pkg/provider/modelmgr/requester.py +++ b/src/langbot/pkg/provider/modelmgr/requester.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc import typing import time +import datetime from ...core import app from ...entity.persistence import model as persistence_model @@ -16,6 +17,15 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage' STREAM_USAGE_QUERY_VARIABLE = '_stream_usage' +def _utc_now() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + +def _query_session_id(query: pipeline_query.Query) -> str: + launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type) + return f'{launcher_type}_{query.launcher_id}' + + def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None: """Store the latest provider usage on the query for upstream action handlers.""" if query is None or not usage_info: @@ -59,6 +69,7 @@ class RuntimeProvider: """Bridge method for invoking LLM with monitoring""" # Start timing for monitoring start_time = time.time() + span_started_at = _utc_now() input_tokens = 0 output_tokens = 0 status = 'success' @@ -125,6 +136,30 @@ class RuntimeProvider: error_message=error_message, message_id=message_id, ) + trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None + parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None + if trace_id: + await self.requester.ap.monitoring_service.record_span( + trace_id=trace_id, + parent_span_id=parent_span_id, + name=f'LLM {model.model_entity.name}', + kind='model.llm', + status=status, + started_at=span_started_at, + duration=duration_ms, + message_id=message_id, + session_id=_query_session_id(query), + bot_id=query.bot_uuid, + pipeline_id=query.pipeline_uuid, + attributes={ + 'model_name': model.model_entity.name, + 'input_tokens': input_tokens, + 'output_tokens': output_tokens, + 'total_tokens': input_tokens + output_tokens, + 'stream': False, + }, + error_message=error_message, + ) except Exception as monitor_err: self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}') @@ -140,6 +175,7 @@ class RuntimeProvider: """Bridge method for invoking LLM stream with monitoring""" # Start timing for monitoring start_time = time.time() + span_started_at = _utc_now() status = 'success' error_message = None input_tokens = 0 @@ -204,6 +240,30 @@ class RuntimeProvider: error_message=error_message, message_id=message_id, ) + trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None + parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None + if trace_id: + await self.requester.ap.monitoring_service.record_span( + trace_id=trace_id, + parent_span_id=parent_span_id, + name=f'LLM stream {model.model_entity.name}', + kind='model.llm', + status=status, + started_at=span_started_at, + duration=duration_ms, + message_id=message_id, + session_id=_query_session_id(query), + bot_id=query.bot_uuid, + pipeline_id=query.pipeline_uuid, + attributes={ + 'model_name': model.model_entity.name, + 'input_tokens': input_tokens, + 'output_tokens': output_tokens, + 'total_tokens': input_tokens + output_tokens, + 'stream': True, + }, + error_message=error_message, + ) except Exception as monitor_err: self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}') diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 9a90ed47..640af832 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -268,6 +268,19 @@ class LocalAgentRunner(runner.RequestRunner): 'bot_uuid': query.bot_uuid or '', 'sender_id': str(query.sender_id), 'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + '_trace_context': { + 'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None, + 'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None, + 'message_id': query.variables.get('_monitoring_message_id') if query.variables else None, + 'query_id': query.query_id, + 'session_id': f'{query.launcher_type.value}_{query.launcher_id}', + 'bot_id': query.bot_uuid or '', + 'pipeline_id': query.pipeline_uuid or '', + 'knowledge_base_id': kb_uuid, + 'attributes': { + 'source': 'local-agent', + }, + }, }, ) diff --git a/src/langbot/pkg/rag/knowledge/kbmgr.py b/src/langbot/pkg/rag/knowledge/kbmgr.py index cd37994c..ed71dafa 100644 --- a/src/langbot/pkg/rag/knowledge/kbmgr.py +++ b/src/langbot/pkg/rag/knowledge/kbmgr.py @@ -5,6 +5,7 @@ import traceback import uuid import zipfile import io +import datetime from typing import Any from langbot.pkg.core import app import sqlalchemy @@ -25,6 +26,10 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface): super().__init__(ap) self.knowledge_base_entity = knowledge_base_entity + @staticmethod + def _utc_now() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + async def initialize(self): pass @@ -334,6 +339,24 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface): # are passed directly to vector_search by some plugins (e.g. LangRAG) # and would cause empty results when the metadata field doesn't exist. filters = settings.pop('filters', {}) + trace_context = settings.pop('_trace_context', None) + host_span_started_at = self._utc_now() + host_span_id = None + if trace_context and trace_context.get('trace_id'): + host_parent_span_id = trace_context.get('parent_span_id') + host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}' + trace_context = { + 'trace_id': trace_context.get('trace_id'), + 'parent_span_id': host_span_id, + 'host_parent_span_id': host_parent_span_id, + 'message_id': trace_context.get('message_id'), + 'query_id': trace_context.get('query_id'), + 'session_id': trace_context.get('session_id'), + 'bot_id': trace_context.get('bot_id'), + 'pipeline_id': trace_context.get('pipeline_id'), + 'knowledge_base_id': kb.uuid, + 'attributes': trace_context.get('attributes') or {}, + } retrieval_context = { 'query': query, @@ -343,13 +366,104 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface): 'creation_settings': kb.creation_settings or {}, 'filters': filters, } + if trace_context: + retrieval_context['trace_context'] = trace_context - result = await self.ap.plugin_connector.call_rag_retrieve( - plugin_id, - retrieval_context, - ) + try: + result = await self.ap.plugin_connector.call_rag_retrieve( + plugin_id, + retrieval_context, + ) + except Exception as e: + if trace_context: + await self._record_rag_trace_result( + trace_context=trace_context, + host_span_id=host_span_id, + started_at=host_span_started_at, + plugin_id=plugin_id, + result={ + 'results': [], + 'metadata': { + 'status': 'error', + 'error_message': str(e), + }, + }, + ) + raise + if trace_context: + await self._record_rag_trace_result( + trace_context=trace_context, + host_span_id=host_span_id, + started_at=host_span_started_at, + plugin_id=plugin_id, + result=result, + ) return result + async def _record_rag_trace_result( + self, + trace_context: dict[str, Any], + host_span_id: str | None, + started_at: datetime.datetime, + plugin_id: str, + result: dict[str, Any], + ) -> None: + """Persist host RAG span and plugin-provided child spans.""" + trace_id = trace_context.get('trace_id') + if not trace_id: + return + + metadata = result.get('metadata') if isinstance(result, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else [] + parent_span_id = trace_context.get('parent_span_id') + host_parent_span_id = trace_context.get('host_parent_span_id') + + try: + await self.ap.monitoring_service.record_span( + trace_id=trace_id, + span_id=host_span_id, + parent_span_id=host_parent_span_id, + name=f'Knowledge retrieval {self.knowledge_base_entity.name}', + kind='rag.retrieval', + status=metadata.get('status', 'success'), + started_at=started_at, + duration=metadata.get('duration_ms'), + message_id=trace_context.get('message_id'), + session_id=trace_context.get('session_id'), + bot_id=trace_context.get('bot_id'), + pipeline_id=trace_context.get('pipeline_id'), + attributes={ + 'knowledge_base_id': self.knowledge_base_entity.uuid, + 'knowledge_base_name': self.knowledge_base_entity.name, + 'plugin_id': plugin_id, + 'returned_count': len(result.get('results', []) if isinstance(result, dict) else []), + 'total_found': result.get('total_found') if isinstance(result, dict) else None, + }, + error_message=metadata.get('error_message'), + ) + for span in plugin_spans: + if not isinstance(span, dict): + continue + await self.ap.monitoring_service.record_span( + trace_id=trace_id, + span_id=span.get('span_id'), + parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id, + name=span.get('name') or 'RAG plugin stage', + kind=span.get('kind') or 'rag.stage', + status=span.get('status') or 'success', + started_at=started_at, + duration=span.get('duration_ms'), + message_id=trace_context.get('message_id'), + session_id=trace_context.get('session_id'), + bot_id=trace_context.get('bot_id'), + pipeline_id=trace_context.get('pipeline_id'), + attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {}, + error_message=span.get('error_message'), + ) + except Exception as e: + self.ap.logger.error(f'Failed to record RAG trace spans: {e}') + async def _delete_document(self, document_id: str) -> bool: """Call plugin to delete document.""" kb = self.knowledge_base_entity diff --git a/tests/integration/api/test_monitoring.py b/tests/integration/api/test_monitoring.py index 6a65790f..d7b4af89 100644 --- a/tests/integration/api/test_monitoring.py +++ b/tests/integration/api/test_monitoring.py @@ -82,6 +82,15 @@ def fake_monitoring_app(): app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100)) app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50)) app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10)) + app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1)) + app.monitoring_service.get_trace_details = AsyncMock( + return_value={ + 'found': True, + 'trace_id': 'trace-1', + 'trace': {'trace_id': 'trace-1'}, + 'spans': [], + } + ) app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20)) app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2)) app.monitoring_service.get_session_analysis = AsyncMock( @@ -222,6 +231,7 @@ class TestMonitoringAllDataEndpoint: assert response.status_code == 200 data = await response.get_json() assert 'overview' in data['data'] + assert 'traces' in data['data'] @pytest.mark.usefixtures('mock_circular_import_chain') @@ -246,6 +256,15 @@ class TestMonitoringDetailsEndpoints: assert response.status_code == 200 + @pytest.mark.asyncio + async def test_get_trace_details(self, quart_test_client): + """GET /api/v1/monitoring/traces/{id}.""" + response = await quart_test_client.get( + '/api/v1/monitoring/traces/trace-1', headers={'Authorization': 'Bearer test_token'} + ) + + assert response.status_code == 200 + @pytest.mark.usefixtures('mock_circular_import_chain') class TestMonitoringFeedbackEndpoints: diff --git a/tests/integration/persistence/test_migrations.py b/tests/integration/persistence/test_migrations.py index f9872f82..ccc31a7c 100644 --- a/tests/integration/persistence/test_migrations.py +++ b/tests/integration/persistence/test_migrations.py @@ -104,7 +104,7 @@ class TestSQLiteMigrationUpgrade: rev = await get_alembic_current(sqlite_engine) assert rev is not None, 'Expected a revision after upgrade' # Head should be the latest migration - assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}' + assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}' @pytest.mark.asyncio async def test_upgrade_idempotent(self, sqlite_engine): diff --git a/tests/integration/persistence/test_migrations_postgres.py b/tests/integration/persistence/test_migrations_postgres.py index 28d06a2c..95794d47 100644 --- a/tests/integration/persistence/test_migrations_postgres.py +++ b/tests/integration/persistence/test_migrations_postgres.py @@ -144,8 +144,8 @@ class TestPostgreSQLMigrationUpgrade: # Verify revision rev = await get_alembic_current(postgres_engine) assert rev is not None, 'Expected a revision after upgrade' - # Head should be the latest migration (0005 for current state) - assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}' + # Head should be the latest migration. + assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}' @pytest.mark.asyncio async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version): diff --git a/web/src/app/home/monitoring/hooks/useMonitoringData.ts b/web/src/app/home/monitoring/hooks/useMonitoringData.ts index 315cd5bb..bf825919 100644 --- a/web/src/app/home/monitoring/hooks/useMonitoringData.ts +++ b/web/src/app/home/monitoring/hooks/useMonitoringData.ts @@ -5,6 +5,7 @@ import { ModelCall, LLMCall, EmbeddingCall, + MonitoringTrace, } from '../types/monitoring'; import { backendClient } from '@/app/infra/http'; import { parseUTCTimestamp } from '../utils/dateUtils'; @@ -263,12 +264,48 @@ export function useMonitoringData(filterState: FilterState) { messageId: error.message_id, }), ), + traces: (response.traces || []).map( + (trace: { + trace_id: string; + started_at: string; + ended_at?: string; + duration?: number; + status: string; + name: string; + bot_id?: string; + bot_name?: string; + pipeline_id?: string; + pipeline_name?: string; + session_id?: string; + message_id?: string; + query_id?: string; + attributes?: Record; + }): MonitoringTrace => ({ + traceId: trace.trace_id, + name: trace.name, + startedAt: parseUTCTimestamp(trace.started_at), + endedAt: trace.ended_at + ? parseUTCTimestamp(trace.ended_at) + : undefined, + duration: trace.duration, + status: trace.status as 'running' | 'success' | 'error', + botId: trace.bot_id, + botName: trace.bot_name, + pipelineId: trace.pipeline_id, + pipelineName: trace.pipeline_name, + sessionId: trace.session_id, + messageId: trace.message_id, + queryId: trace.query_id, + attributes: trace.attributes || {}, + }), + ), totalCount: { messages: response.totalCount.messages, llmCalls: response.totalCount.llmCalls, embeddingCalls: response.totalCount.embeddingCalls || 0, sessions: response.totalCount.sessions, errors: response.totalCount.errors, + traces: response.totalCount.traces || 0, }, }; diff --git a/web/src/app/home/monitoring/page.tsx b/web/src/app/home/monitoring/page.tsx index 7dbe2e59..51c8e483 100644 --- a/web/src/app/home/monitoring/page.tsx +++ b/web/src/app/home/monitoring/page.tsx @@ -10,6 +10,7 @@ import { MessageSquare, Sparkles, CheckCircle2, + GitBranch, } from 'lucide-react'; import OverviewCards from './components/overview-cards/OverviewCards'; import MonitoringFilters from './components/filters/MonitoringFilters'; @@ -22,9 +23,15 @@ import { MessageDetailsCard } from './components/MessageDetailsCard'; import { MessageContentRenderer } from './components/MessageContentRenderer'; import { FeedbackStatsCards } from './components/FeedbackCard'; import { FeedbackList } from './components/FeedbackList'; -import { MessageDetails } from './types/monitoring'; +import { + MessageDetails, + TraceDetails, + MonitoringSpan, +} from './types/monitoring'; import { httpClient } from '@/app/infra/http/HttpClient'; +import { backendClient } from '@/app/infra/http'; import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner'; +import { parseUTCTimestamp } from './utils/dateUtils'; interface RawMessageData { id: string; @@ -72,6 +79,97 @@ interface RawErrorData { stack_trace: string | null; } +interface RawTraceData { + trace_id: string; + started_at: string; + ended_at?: string; + duration?: number; + status: string; + name: string; + bot_id?: string; + bot_name?: string; + pipeline_id?: string; + pipeline_name?: string; + session_id?: string; + message_id?: string; + query_id?: string; + attributes?: Record; +} + +interface RawSpanData { + span_id: string; + trace_id: string; + parent_span_id?: string; + name: string; + kind: string; + status: string; + started_at: string; + ended_at?: string; + duration?: number; + message_id?: string; + session_id?: string; + bot_id?: string; + pipeline_id?: string; + attributes?: Record; + error_message?: string; +} + +function mapTrace(raw: RawTraceData) { + return { + traceId: raw.trace_id, + name: raw.name, + startedAt: parseUTCTimestamp(raw.started_at), + endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined, + duration: raw.duration, + status: raw.status as 'running' | 'success' | 'error', + botId: raw.bot_id, + botName: raw.bot_name, + pipelineId: raw.pipeline_id, + pipelineName: raw.pipeline_name, + sessionId: raw.session_id, + messageId: raw.message_id, + queryId: raw.query_id, + attributes: raw.attributes || {}, + }; +} + +function mapSpan(raw: RawSpanData): MonitoringSpan { + return { + spanId: raw.span_id, + traceId: raw.trace_id, + parentSpanId: raw.parent_span_id, + name: raw.name, + kind: raw.kind, + status: raw.status as 'running' | 'success' | 'error', + startedAt: parseUTCTimestamp(raw.started_at), + endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined, + duration: raw.duration, + messageId: raw.message_id, + sessionId: raw.session_id, + botId: raw.bot_id, + pipelineId: raw.pipeline_id, + attributes: raw.attributes || {}, + errorMessage: raw.error_message, + }; +} + +function spanDepth( + span: MonitoringSpan, + spansById: Map, +) { + let depth = 0; + let current = span.parentSpanId + ? spansById.get(span.parentSpanId) + : undefined; + while (current && depth < 8) { + depth += 1; + current = current.parentSpanId + ? spansById.get(current.parentSpanId) + : undefined; + } + return depth; +} + function MonitoringPageContent() { const { t } = useTranslation(); const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } = @@ -158,6 +256,13 @@ function MonitoringPageContent() { // State for expanded errors const [expandedErrorId, setExpandedErrorId] = useState(null); + const [expandedTraceId, setExpandedTraceId] = useState(null); + const [traceDetails, setTraceDetails] = useState< + Record + >({}); + const [loadingTraceDetails, setLoadingTraceDetails] = useState< + Record + >({}); // State for controlled tabs const [activeTab, setActiveTab] = useState('messages'); @@ -265,6 +370,34 @@ function MonitoringPageContent() { } }; + const toggleTraceExpand = async (traceId: string) => { + if (expandedTraceId === traceId) { + setExpandedTraceId(null); + return; + } + + setExpandedTraceId(traceId); + if (traceDetails[traceId]) return; + + setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: true })); + try { + const result = await backendClient.getMonitoringTraceDetails(traceId); + setTraceDetails((prev) => ({ + ...prev, + [traceId]: { + traceId: result.trace_id, + found: result.found, + trace: result.trace ? mapTrace(result.trace) : undefined, + spans: (result.spans || []).map(mapSpan), + }, + })); + } catch (error) { + console.error('Failed to fetch trace details:', error); + } finally { + setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: false })); + } + }; + return (
{/* Filters and Refresh Button - Sticky */} @@ -323,6 +456,9 @@ function MonitoringPageContent() { {t('monitoring.tabs.tokens')} + + {t('monitoring.tabs.traces')} + {t('monitoring.tabs.feedback')} @@ -690,6 +826,166 @@ function MonitoringPageContent() { /> + +
+ {loading && ( +
+ +
+ )} + + {!loading && data && data.traces && data.traces.length > 0 && ( +
+ {data.traces.map((trace) => { + const details = traceDetails[trace.traceId]; + const spans = details?.spans || []; + const spansById = new Map( + spans.map((span) => [span.spanId, span]), + ); + const maxDuration = Math.max( + 1, + ...spans.map((span) => span.duration || 0), + ); + + return ( +
+
toggleTraceExpand(trace.traceId)} + > +
+
+
+ {expandedTraceId === trace.traceId ? ( + + ) : ( + + )} +
+
+
+ + {trace.traceId} + + + {trace.status} + +
+
+ {trace.name} +
+
+ {trace.botName || '-'} →{' '} + {trace.pipelineName || '-'} + {trace.sessionId + ? ` · ${trace.sessionId}` + : ''} +
+
+
+
+ {trace.startedAt.toLocaleString()} + {trace.duration ?? 0}ms +
+
+
+ + {expandedTraceId === trace.traceId && ( +
+ {loadingTraceDetails[trace.traceId] && ( +
+ +
+ )} + {!loadingTraceDetails[trace.traceId] && ( +
+ {spans.length === 0 && ( +
+ {t('monitoring.traces.noSpans')} +
+ )} + {spans.map((span) => { + const depth = spanDepth(span, spansById); + const width = Math.max( + 6, + Math.min( + 100, + ((span.duration || 0) / maxDuration) * + 100, + ), + ); + return ( +
+
+
+ {span.name} +
+
+ {span.kind} +
+
+
+
+
+
+ {span.duration ?? 0}ms +
+ {span.errorMessage && ( +
+ {span.errorMessage} +
+ )} +
+ ); + })} +
+ )} +
+ )} +
+ ); + })} +
+ )} + + {!loading && + (!data || !data.traces || data.traces.length === 0) && ( +
+ +
+ {t('monitoring.traces.noTraces')} +
+
+ )} +
+ +
{loading && ( diff --git a/web/src/app/home/monitoring/types/monitoring.ts b/web/src/app/home/monitoring/types/monitoring.ts index 564bd97e..8ecb491a 100644 --- a/web/src/app/home/monitoring/types/monitoring.ts +++ b/web/src/app/home/monitoring/types/monitoring.ts @@ -111,6 +111,48 @@ export interface ErrorLog { messageId?: string; } +export interface MonitoringTrace { + traceId: string; + name: string; + startedAt: Date; + endedAt?: Date; + duration?: number; + status: 'running' | 'success' | 'error'; + botId?: string; + botName?: string; + pipelineId?: string; + pipelineName?: string; + sessionId?: string; + messageId?: string; + queryId?: string; + attributes: Record; +} + +export interface MonitoringSpan { + spanId: string; + traceId: string; + parentSpanId?: string; + name: string; + kind: string; + status: 'success' | 'error' | 'running'; + startedAt: Date; + endedAt?: Date; + duration?: number; + messageId?: string; + sessionId?: string; + botId?: string; + pipelineId?: string; + attributes: Record; + errorMessage?: string; +} + +export interface TraceDetails { + traceId: string; + found: boolean; + trace?: MonitoringTrace; + spans: MonitoringSpan[]; +} + export interface MessageDetails { messageId: string; found: boolean; @@ -125,6 +167,7 @@ export interface MessageDetails { averageDurationMs: number; }; errors: ErrorLog[]; + trace?: MonitoringTrace; } export interface OverviewMetrics { @@ -203,6 +246,7 @@ export interface MonitoringData { modelCalls: ModelCall[]; sessions: SessionInfo[]; errors: ErrorLog[]; + traces: MonitoringTrace[]; feedback?: FeedbackRecord[]; feedbackStats?: FeedbackStats; totalCount: { @@ -211,6 +255,7 @@ export interface MonitoringData { embeddingCalls: number; sessions: number; errors: number; + traces: number; feedback?: number; }; } diff --git a/web/src/app/infra/http/BackendClient.ts b/web/src/app/infra/http/BackendClient.ts index b2f3f7b5..9576e7a8 100644 --- a/web/src/app/infra/http/BackendClient.ts +++ b/web/src/app/infra/http/BackendClient.ts @@ -1185,12 +1185,29 @@ export class BackendClient extends BaseHttpClient { stack_trace?: string; message_id?: string; }>; + traces?: Array<{ + trace_id: string; + started_at: string; + ended_at?: string; + duration?: number; + status: string; + name: string; + bot_id?: string; + bot_name?: string; + pipeline_id?: string; + pipeline_name?: string; + session_id?: string; + message_id?: string; + query_id?: string; + attributes?: Record; + }>; totalCount: { messages: number; llmCalls: number; embeddingCalls: number; sessions: number; errors: number; + traces?: number; }; }> { const queryParams = new URLSearchParams(); @@ -1213,6 +1230,90 @@ export class BackendClient extends BaseHttpClient { return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`); } + public getMonitoringTraces(params: { + botId?: string[]; + pipelineId?: string[]; + startTime?: string; + endTime?: string; + limit?: number; + }): Promise<{ + traces: Array<{ + trace_id: string; + started_at: string; + ended_at?: string; + duration?: number; + status: string; + name: string; + bot_id?: string; + bot_name?: string; + pipeline_id?: string; + pipeline_name?: string; + session_id?: string; + message_id?: string; + query_id?: string; + attributes?: Record; + }>; + total: number; + }> { + const queryParams = new URLSearchParams(); + if (params.botId) { + params.botId.forEach((id) => queryParams.append('botId', id)); + } + if (params.pipelineId) { + params.pipelineId.forEach((id) => queryParams.append('pipelineId', id)); + } + if (params.startTime) { + queryParams.append('startTime', params.startTime); + } + if (params.endTime) { + queryParams.append('endTime', params.endTime); + } + if (params.limit) { + queryParams.append('limit', params.limit.toString()); + } + return this.get(`/api/v1/monitoring/traces?${queryParams.toString()}`); + } + + public getMonitoringTraceDetails(traceId: string): Promise<{ + trace_id: string; + found: boolean; + trace: { + trace_id: string; + started_at: string; + ended_at?: string; + duration?: number; + status: string; + name: string; + bot_id?: string; + bot_name?: string; + pipeline_id?: string; + pipeline_name?: string; + session_id?: string; + message_id?: string; + query_id?: string; + attributes?: Record; + }; + spans: Array<{ + span_id: string; + trace_id: string; + parent_span_id?: string; + name: string; + kind: string; + status: string; + started_at: string; + ended_at?: string; + duration?: number; + message_id?: string; + session_id?: string; + bot_id?: string; + pipeline_id?: string; + attributes?: Record; + error_message?: string; + }>; + }> { + return this.get(`/api/v1/monitoring/traces/${traceId}`); + } + public getMonitoringOverview(params: { botId?: string[]; pipelineId?: string[]; diff --git a/web/src/i18n/locales/en-US.ts b/web/src/i18n/locales/en-US.ts index 902c749b..398f0d55 100644 --- a/web/src/i18n/locales/en-US.ts +++ b/web/src/i18n/locales/en-US.ts @@ -1217,6 +1217,7 @@ const enUS = { embeddingCalls: 'Embedding Calls', modelCalls: 'Model Calls', tokens: 'Token Monitoring', + traces: 'Traces', feedback: 'User Feedback', sessions: 'Session Analysis', errors: 'Error Logs', @@ -1321,6 +1322,11 @@ const enUS = { noErrors: 'No errors found', stackTrace: 'Stack Trace', }, + traces: { + title: 'Traces', + noTraces: 'No traces found', + noSpans: 'No spans recorded for this trace', + }, feedback: { title: 'User Feedback', totalFeedback: 'Total Feedback', diff --git a/web/src/i18n/locales/zh-Hans.ts b/web/src/i18n/locales/zh-Hans.ts index bb72de0f..d6215c4f 100644 --- a/web/src/i18n/locales/zh-Hans.ts +++ b/web/src/i18n/locales/zh-Hans.ts @@ -1158,6 +1158,7 @@ const zhHans = { embeddingCalls: 'Embedding调用', modelCalls: '模型调用', tokens: 'Token 监控', + traces: '链路追踪', feedback: '用户反馈', sessions: '会话分析', errors: '错误日志', @@ -1262,6 +1263,11 @@ const zhHans = { noErrors: '未找到错误', stackTrace: '堆栈追踪', }, + traces: { + title: '链路追踪', + noTraces: '未找到链路记录', + noSpans: '此链路暂无 Span 记录', + }, feedback: { title: '用户反馈', totalFeedback: '总反馈数',