mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-21 21:14:20 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 16d47d3e61 | |||
| 3146c58905 | |||
| d92b664136 | |||
| 8789c42eeb |
@@ -313,18 +313,30 @@ class MonitoringRouterGroup(group.RouterGroup):
|
|||||||
offset=0,
|
offset=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Get traces
|
||||||
|
traces, traces_total = await self.ap.monitoring_service.get_traces(
|
||||||
|
bot_ids=bot_ids if bot_ids else None,
|
||||||
|
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
limit=limit,
|
||||||
|
offset=0,
|
||||||
|
)
|
||||||
|
|
||||||
return self.success(
|
return self.success(
|
||||||
data={
|
data={
|
||||||
'overview': overview,
|
'overview': overview,
|
||||||
'messages': messages,
|
'messages': messages,
|
||||||
'llmCalls': llm_calls,
|
'llmCalls': llm_calls,
|
||||||
'embeddingCalls': embedding_calls,
|
'embeddingCalls': embedding_calls,
|
||||||
|
'traces': traces,
|
||||||
'sessions': sessions,
|
'sessions': sessions,
|
||||||
'errors': errors,
|
'errors': errors,
|
||||||
'totalCount': {
|
'totalCount': {
|
||||||
'messages': messages_total,
|
'messages': messages_total,
|
||||||
'llmCalls': llm_calls_total,
|
'llmCalls': llm_calls_total,
|
||||||
'embeddingCalls': embedding_calls_total,
|
'embeddingCalls': embedding_calls_total,
|
||||||
|
'traces': traces_total,
|
||||||
'sessions': sessions_total,
|
'sessions': sessions_total,
|
||||||
'errors': errors_total,
|
'errors': errors_total,
|
||||||
},
|
},
|
||||||
@@ -350,6 +362,49 @@ class MonitoringRouterGroup(group.RouterGroup):
|
|||||||
|
|
||||||
return self.success(data=details)
|
return self.success(data=details)
|
||||||
|
|
||||||
|
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||||
|
async def get_traces() -> str:
|
||||||
|
"""Get end-to-end trace records."""
|
||||||
|
bot_ids = quart.request.args.getlist('botId')
|
||||||
|
pipeline_ids = quart.request.args.getlist('pipelineId')
|
||||||
|
session_ids = quart.request.args.getlist('sessionId')
|
||||||
|
statuses = quart.request.args.getlist('status')
|
||||||
|
start_time_str = quart.request.args.get('startTime')
|
||||||
|
end_time_str = quart.request.args.get('endTime')
|
||||||
|
limit = int(quart.request.args.get('limit', 100))
|
||||||
|
offset = int(quart.request.args.get('offset', 0))
|
||||||
|
|
||||||
|
start_time = parse_iso_datetime(start_time_str)
|
||||||
|
end_time = parse_iso_datetime(end_time_str)
|
||||||
|
|
||||||
|
traces, total = await self.ap.monitoring_service.get_traces(
|
||||||
|
bot_ids=bot_ids if bot_ids else None,
|
||||||
|
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||||
|
session_ids=session_ids if session_ids else None,
|
||||||
|
statuses=statuses if statuses else None,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
limit=limit,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
|
||||||
|
return self.success(
|
||||||
|
data={
|
||||||
|
'traces': traces,
|
||||||
|
'total': total,
|
||||||
|
'limit': limit,
|
||||||
|
'offset': offset,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||||
|
async def get_trace_details(trace_id: str) -> str:
|
||||||
|
"""Get one trace with all spans."""
|
||||||
|
details = await self.ap.monitoring_service.get_trace_details(trace_id)
|
||||||
|
if not details.get('found'):
|
||||||
|
return self.http_status(404, -1, f'Trace {trace_id} not found')
|
||||||
|
return self.success(data=details)
|
||||||
|
|
||||||
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||||
async def export_data() -> tuple[str, int]:
|
async def export_data() -> tuple[str, int]:
|
||||||
"""Export monitoring data as CSV"""
|
"""Export monitoring data as CSV"""
|
||||||
|
|||||||
@@ -350,8 +350,24 @@ class PluginsRouterGroup(group.RouterGroup):
|
|||||||
if not endpoint.startswith('/') or '..' in endpoint:
|
if not endpoint.startswith('/') or '..' in endpoint:
|
||||||
return self.http_status(400, -1, 'invalid endpoint')
|
return self.http_status(400, -1, 'invalid endpoint')
|
||||||
|
|
||||||
|
caller = {
|
||||||
|
'plugin_author': author,
|
||||||
|
'plugin_name': plugin_name,
|
||||||
|
'page_id': page_id,
|
||||||
|
'origin': _get_request_origin(),
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
key: value
|
||||||
|
for key, value in {
|
||||||
|
'user-agent': quart.request.headers.get('User-Agent'),
|
||||||
|
'x-request-id': quart.request.headers.get('X-Request-ID'),
|
||||||
|
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
|
||||||
|
}.items()
|
||||||
|
if value
|
||||||
|
}
|
||||||
|
|
||||||
result = await self.ap.plugin_connector.handle_page_api(
|
result = await self.ap.plugin_connector.handle_page_api(
|
||||||
author, plugin_name, page_id, endpoint, method.upper(), body
|
author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers
|
||||||
)
|
)
|
||||||
if result.get('error'):
|
if result.get('error'):
|
||||||
return self.http_status(400, -1, result['error'])
|
return self.http_status(400, -1, result['error'])
|
||||||
|
|||||||
@@ -3,11 +3,55 @@ from __future__ import annotations
|
|||||||
import uuid
|
import uuid
|
||||||
import datetime
|
import datetime
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
|
import json
|
||||||
|
|
||||||
from ....core import app
|
from ....core import app
|
||||||
from ....entity.persistence import monitoring as persistence_monitoring
|
from ....entity.persistence import monitoring as persistence_monitoring
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Move shared trace/time helpers into a small monitoring utility module
|
||||||
|
# when trace propagation expands beyond the current query/retrieval path.
|
||||||
|
def _utc_now() -> datetime.datetime:
|
||||||
|
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||||
|
|
||||||
|
|
||||||
|
def _json_dumps(value: dict | list | None) -> str | None:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.dumps(value, ensure_ascii=False, default=str)
|
||||||
|
except Exception:
|
||||||
|
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
def _json_loads(value: str | None) -> dict | list | None:
|
||||||
|
if not value:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.loads(value)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def new_trace_id() -> str:
|
||||||
|
return f'trace-{uuid.uuid4().hex[:16]}'
|
||||||
|
|
||||||
|
|
||||||
|
def new_span_id() -> str:
|
||||||
|
return f'span-{uuid.uuid4().hex[:16]}'
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_trace_status(status: str | None) -> str:
|
||||||
|
"""Normalize operation status to the monitoring UI vocabulary."""
|
||||||
|
if status in ('completed', 'ok'):
|
||||||
|
return 'success'
|
||||||
|
if status in ('failed', 'failure', 'exception'):
|
||||||
|
return 'error'
|
||||||
|
if status in ('running', 'success', 'error'):
|
||||||
|
return status
|
||||||
|
return 'success'
|
||||||
|
|
||||||
|
|
||||||
class MonitoringService:
|
class MonitoringService:
|
||||||
"""Monitoring service"""
|
"""Monitoring service"""
|
||||||
|
|
||||||
@@ -74,6 +118,18 @@ class MonitoringService:
|
|||||||
persistence_monitoring.MonitoringFeedback.timestamp,
|
persistence_monitoring.MonitoringFeedback.timestamp,
|
||||||
persistence_monitoring.MonitoringFeedback.id,
|
persistence_monitoring.MonitoringFeedback.id,
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
'monitoring_traces',
|
||||||
|
persistence_monitoring.MonitoringTrace,
|
||||||
|
persistence_monitoring.MonitoringTrace.started_at,
|
||||||
|
persistence_monitoring.MonitoringTrace.trace_id,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
'monitoring_spans',
|
||||||
|
persistence_monitoring.MonitoringSpan,
|
||||||
|
persistence_monitoring.MonitoringSpan.started_at,
|
||||||
|
persistence_monitoring.MonitoringSpan.span_id,
|
||||||
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
deleted_counts: dict[str, int] = {}
|
deleted_counts: dict[str, int] = {}
|
||||||
@@ -133,6 +189,116 @@ class MonitoringService:
|
|||||||
|
|
||||||
# ========== Recording Methods ==========
|
# ========== Recording Methods ==========
|
||||||
|
|
||||||
|
async def start_trace(
|
||||||
|
self,
|
||||||
|
trace_id: str | None = None,
|
||||||
|
name: str = 'LangBot query',
|
||||||
|
bot_id: str | None = None,
|
||||||
|
bot_name: str | None = None,
|
||||||
|
pipeline_id: str | None = None,
|
||||||
|
pipeline_name: str | None = None,
|
||||||
|
session_id: str | None = None,
|
||||||
|
message_id: str | None = None,
|
||||||
|
query_id: str | int | None = None,
|
||||||
|
attributes: dict | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Create or update a trace header row."""
|
||||||
|
trace_id = trace_id or new_trace_id()
|
||||||
|
trace_data = {
|
||||||
|
'trace_id': trace_id,
|
||||||
|
'started_at': _utc_now(),
|
||||||
|
'ended_at': None,
|
||||||
|
'duration': None,
|
||||||
|
'status': 'running',
|
||||||
|
'name': name,
|
||||||
|
'bot_id': bot_id,
|
||||||
|
'bot_name': bot_name,
|
||||||
|
'pipeline_id': pipeline_id,
|
||||||
|
'pipeline_name': pipeline_name,
|
||||||
|
'session_id': session_id,
|
||||||
|
'message_id': message_id,
|
||||||
|
'query_id': str(query_id) if query_id is not None else None,
|
||||||
|
'attributes': _json_dumps(attributes),
|
||||||
|
}
|
||||||
|
|
||||||
|
await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
|
||||||
|
)
|
||||||
|
return trace_id
|
||||||
|
|
||||||
|
async def finish_trace(
|
||||||
|
self,
|
||||||
|
trace_id: str,
|
||||||
|
status: str = 'success',
|
||||||
|
duration: int | None = None,
|
||||||
|
message_id: str | None = None,
|
||||||
|
attributes: dict | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Mark a trace complete."""
|
||||||
|
update_values: dict = {
|
||||||
|
'ended_at': _utc_now(),
|
||||||
|
'status': normalize_trace_status(status),
|
||||||
|
}
|
||||||
|
if duration is not None:
|
||||||
|
update_values['duration'] = duration
|
||||||
|
if message_id is not None:
|
||||||
|
update_values['message_id'] = message_id
|
||||||
|
if attributes is not None:
|
||||||
|
update_values['attributes'] = _json_dumps(attributes)
|
||||||
|
|
||||||
|
await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
|
||||||
|
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
|
||||||
|
.values(update_values)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def record_span(
|
||||||
|
self,
|
||||||
|
trace_id: str,
|
||||||
|
name: str,
|
||||||
|
kind: str,
|
||||||
|
status: str = 'success',
|
||||||
|
span_id: str | None = None,
|
||||||
|
parent_span_id: str | None = None,
|
||||||
|
started_at: datetime.datetime | None = None,
|
||||||
|
ended_at: datetime.datetime | None = None,
|
||||||
|
duration: int | None = None,
|
||||||
|
message_id: str | None = None,
|
||||||
|
session_id: str | None = None,
|
||||||
|
bot_id: str | None = None,
|
||||||
|
pipeline_id: str | None = None,
|
||||||
|
attributes: dict | None = None,
|
||||||
|
error_message: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Record a single completed span."""
|
||||||
|
started_at = started_at or _utc_now()
|
||||||
|
if duration is None and ended_at is not None:
|
||||||
|
duration = int((ended_at - started_at).total_seconds() * 1000)
|
||||||
|
elif duration is not None:
|
||||||
|
duration = int(round(float(duration)))
|
||||||
|
span_data = {
|
||||||
|
'span_id': span_id or new_span_id(),
|
||||||
|
'trace_id': trace_id,
|
||||||
|
'parent_span_id': parent_span_id,
|
||||||
|
'name': name,
|
||||||
|
'kind': kind,
|
||||||
|
'status': normalize_trace_status(status),
|
||||||
|
'started_at': started_at,
|
||||||
|
'ended_at': ended_at or _utc_now(),
|
||||||
|
'duration': duration,
|
||||||
|
'message_id': message_id,
|
||||||
|
'session_id': session_id,
|
||||||
|
'bot_id': bot_id,
|
||||||
|
'pipeline_id': pipeline_id,
|
||||||
|
'attributes': _json_dumps(attributes),
|
||||||
|
'error_message': error_message,
|
||||||
|
}
|
||||||
|
|
||||||
|
await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
|
||||||
|
)
|
||||||
|
return span_data['span_id']
|
||||||
|
|
||||||
async def record_message(
|
async def record_message(
|
||||||
self,
|
self,
|
||||||
bot_id: str,
|
bot_id: str,
|
||||||
@@ -1076,6 +1242,19 @@ class MonitoringService:
|
|||||||
for row in error_rows
|
for row in error_rows
|
||||||
]
|
]
|
||||||
|
|
||||||
|
trace_query = (
|
||||||
|
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||||
|
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
|
||||||
|
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
|
||||||
|
.limit(1)
|
||||||
|
)
|
||||||
|
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||||
|
trace_row = trace_result.first()
|
||||||
|
trace = None
|
||||||
|
if trace_row:
|
||||||
|
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||||
|
trace = self._serialize_trace(trace_model)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'message_id': message_id,
|
'message_id': message_id,
|
||||||
'found': True,
|
'found': True,
|
||||||
@@ -1090,6 +1269,84 @@ class MonitoringService:
|
|||||||
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
|
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
|
||||||
},
|
},
|
||||||
'errors': errors,
|
'errors': errors,
|
||||||
|
'trace': trace,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
|
||||||
|
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
|
||||||
|
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
|
||||||
|
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
|
||||||
|
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def get_traces(
|
||||||
|
self,
|
||||||
|
bot_ids: list[str] | None = None,
|
||||||
|
pipeline_ids: list[str] | None = None,
|
||||||
|
session_ids: list[str] | None = None,
|
||||||
|
statuses: list[str] | None = None,
|
||||||
|
start_time: datetime.datetime | None = None,
|
||||||
|
end_time: datetime.datetime | None = None,
|
||||||
|
limit: int = 100,
|
||||||
|
offset: int = 0,
|
||||||
|
) -> tuple[list[dict], int]:
|
||||||
|
"""Get trace headers with filters."""
|
||||||
|
conditions = []
|
||||||
|
if bot_ids:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
|
||||||
|
if pipeline_ids:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
|
||||||
|
if session_ids:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
|
||||||
|
if statuses:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
|
||||||
|
if start_time:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
|
||||||
|
if end_time:
|
||||||
|
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
|
||||||
|
|
||||||
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
|
||||||
|
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||||
|
if conditions:
|
||||||
|
clause = sqlalchemy.and_(*conditions)
|
||||||
|
count_query = count_query.where(clause)
|
||||||
|
query = query.where(clause)
|
||||||
|
|
||||||
|
total_result = await self.ap.persistence_mgr.execute_async(count_query)
|
||||||
|
total = total_result.scalar() or 0
|
||||||
|
|
||||||
|
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
|
||||||
|
result = await self.ap.persistence_mgr.execute_async(query)
|
||||||
|
traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()]
|
||||||
|
return traces, total
|
||||||
|
|
||||||
|
async def get_trace_details(self, trace_id: str) -> dict:
|
||||||
|
"""Get a single trace and all spans in chronological order."""
|
||||||
|
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
|
||||||
|
persistence_monitoring.MonitoringTrace.trace_id == trace_id
|
||||||
|
)
|
||||||
|
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||||
|
trace_row = trace_result.first()
|
||||||
|
if not trace_row:
|
||||||
|
return {'trace_id': trace_id, 'found': False}
|
||||||
|
|
||||||
|
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||||
|
span_query = (
|
||||||
|
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
|
||||||
|
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
|
||||||
|
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
|
||||||
|
)
|
||||||
|
span_result = await self.ap.persistence_mgr.execute_async(span_query)
|
||||||
|
spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()]
|
||||||
|
|
||||||
|
return {
|
||||||
|
'trace_id': trace_id,
|
||||||
|
'found': True,
|
||||||
|
'trace': self._serialize_trace(trace),
|
||||||
|
'spans': spans,
|
||||||
}
|
}
|
||||||
|
|
||||||
# ========== Export Methods ==========
|
# ========== Export Methods ==========
|
||||||
|
|||||||
@@ -3,6 +3,49 @@ import sqlalchemy
|
|||||||
from .base import Base
|
from .base import Base
|
||||||
|
|
||||||
|
|
||||||
|
class MonitoringTrace(Base):
|
||||||
|
"""End-to-end monitoring trace records"""
|
||||||
|
|
||||||
|
__tablename__ = 'monitoring_traces'
|
||||||
|
|
||||||
|
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||||
|
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||||
|
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
|
||||||
|
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||||
|
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
|
||||||
|
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||||
|
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||||
|
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||||
|
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||||
|
|
||||||
|
|
||||||
|
class MonitoringSpan(Base):
|
||||||
|
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
|
||||||
|
|
||||||
|
__tablename__ = 'monitoring_spans'
|
||||||
|
|
||||||
|
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||||
|
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
|
||||||
|
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||||
|
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
|
||||||
|
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
|
||||||
|
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||||
|
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
|
||||||
|
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||||
|
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||||
|
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||||
|
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||||
|
|
||||||
|
|
||||||
class MonitoringMessage(Base):
|
class MonitoringMessage(Base):
|
||||||
"""Monitoring message records"""
|
"""Monitoring message records"""
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,88 @@
|
|||||||
|
"""add monitoring traces and spans
|
||||||
|
|
||||||
|
Revision ID: 0006_monitoring_traces
|
||||||
|
Revises: 0005_add_llm_context_length
|
||||||
|
Create Date: 2026-06-16
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
revision = '0006_monitoring_traces'
|
||||||
|
down_revision = '0005_add_llm_context_length'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
inspector = sa.inspect(conn)
|
||||||
|
tables = set(inspector.get_table_names())
|
||||||
|
|
||||||
|
if 'monitoring_traces' not in tables:
|
||||||
|
op.create_table(
|
||||||
|
'monitoring_traces',
|
||||||
|
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||||
|
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('duration', sa.Integer(), nullable=True),
|
||||||
|
sa.Column('status', sa.String(length=50), nullable=False),
|
||||||
|
sa.Column('name', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('bot_name', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('query_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('attributes', sa.Text(), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint('trace_id'),
|
||||||
|
)
|
||||||
|
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
|
||||||
|
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
|
||||||
|
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
|
||||||
|
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
|
||||||
|
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
|
||||||
|
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
|
||||||
|
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
|
||||||
|
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
|
||||||
|
|
||||||
|
if 'monitoring_spans' not in tables:
|
||||||
|
op.create_table(
|
||||||
|
'monitoring_spans',
|
||||||
|
sa.Column('span_id', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('name', sa.String(length=255), nullable=False),
|
||||||
|
sa.Column('kind', sa.String(length=80), nullable=False),
|
||||||
|
sa.Column('status', sa.String(length=50), nullable=False),
|
||||||
|
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||||
|
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||||
|
sa.Column('duration', sa.Integer(), nullable=True),
|
||||||
|
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||||
|
sa.Column('attributes', sa.Text(), nullable=True),
|
||||||
|
sa.Column('error_message', sa.Text(), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint('span_id'),
|
||||||
|
)
|
||||||
|
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
|
||||||
|
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
|
||||||
|
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
|
||||||
|
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
|
||||||
|
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
|
||||||
|
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
|
||||||
|
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
|
||||||
|
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
|
||||||
|
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
conn = op.get_bind()
|
||||||
|
inspector = sa.inspect(conn)
|
||||||
|
tables = set(inspector.get_table_names())
|
||||||
|
if 'monitoring_spans' in tables:
|
||||||
|
op.drop_table('monitoring_spans')
|
||||||
|
if 'monitoring_traces' in tables:
|
||||||
|
op.drop_table('monitoring_traces')
|
||||||
@@ -2,6 +2,9 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import typing
|
import typing
|
||||||
import traceback
|
import traceback
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
|
|
||||||
@@ -79,6 +82,19 @@ class RuntimePipeline:
|
|||||||
enable_all_plugins: bool
|
enable_all_plugins: bool
|
||||||
"""是否启用所有插件"""
|
"""是否启用所有插件"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _new_span_id() -> str:
|
||||||
|
return f'span-{uuid.uuid4().hex[:16]}'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _utc_now() -> datetime.datetime:
|
||||||
|
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||||
|
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||||
|
return f'{launcher_type}_{query.launcher_id}'
|
||||||
|
|
||||||
enable_all_mcp_servers: bool
|
enable_all_mcp_servers: bool
|
||||||
"""是否启用所有MCP服务器"""
|
"""是否启用所有MCP服务器"""
|
||||||
|
|
||||||
@@ -234,17 +250,29 @@ class RuntimePipeline:
|
|||||||
stage_container = self.stage_containers[i]
|
stage_container = self.stage_containers[i]
|
||||||
|
|
||||||
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
|
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
|
||||||
|
span_started_at = self._utc_now()
|
||||||
|
span_started = time.perf_counter()
|
||||||
|
span_status = 'success'
|
||||||
|
span_error = None
|
||||||
|
span_result_type = None
|
||||||
|
|
||||||
|
try:
|
||||||
result = stage_container.inst.process(query, stage_container.inst_name)
|
result = stage_container.inst.process(query, stage_container.inst_name)
|
||||||
|
|
||||||
if isinstance(result, typing.Coroutine):
|
if isinstance(result, typing.Coroutine):
|
||||||
result = await result
|
result = await result
|
||||||
|
|
||||||
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
||||||
|
span_result_type = str(
|
||||||
|
result.result_type.value if hasattr(result.result_type, 'value') else result.result_type
|
||||||
|
)
|
||||||
self.ap.logger.debug(
|
self.ap.logger.debug(
|
||||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
||||||
)
|
)
|
||||||
await self._check_output(query, result)
|
await self._check_output(query, result)
|
||||||
|
if result.error_notice:
|
||||||
|
span_status = 'error'
|
||||||
|
span_error = result.error_notice
|
||||||
|
|
||||||
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||||
@@ -252,26 +280,72 @@ class RuntimePipeline:
|
|||||||
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||||
query = result.new_query
|
query = result.new_query
|
||||||
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
||||||
|
span_result_type = 'generator'
|
||||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
||||||
|
|
||||||
async for sub_result in result:
|
async for sub_result in result:
|
||||||
|
span_result_type = str(
|
||||||
|
sub_result.result_type.value
|
||||||
|
if hasattr(sub_result.result_type, 'value')
|
||||||
|
else sub_result.result_type
|
||||||
|
)
|
||||||
self.ap.logger.debug(
|
self.ap.logger.debug(
|
||||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
||||||
)
|
)
|
||||||
await self._check_output(query, sub_result)
|
await self._check_output(query, sub_result)
|
||||||
|
if sub_result.error_notice:
|
||||||
|
span_status = 'error'
|
||||||
|
span_error = sub_result.error_notice
|
||||||
|
|
||||||
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
self.ap.logger.debug(
|
||||||
|
f'Stage {stage_container.inst_name} interrupted query {query.query_id}'
|
||||||
|
)
|
||||||
break
|
break
|
||||||
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||||
query = sub_result.new_query
|
query = sub_result.new_query
|
||||||
await self._execute_from_stage(i + 1, query)
|
await self._execute_from_stage(i + 1, query)
|
||||||
break
|
break
|
||||||
|
except Exception as e:
|
||||||
|
span_status = 'error'
|
||||||
|
span_error = str(e)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
trace_id = (query.variables or {}).get('_monitoring_trace_id')
|
||||||
|
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
|
||||||
|
if trace_id:
|
||||||
|
try:
|
||||||
|
await self.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
parent_span_id=root_span_id,
|
||||||
|
name=stage_container.inst_name,
|
||||||
|
kind='pipeline.stage',
|
||||||
|
status=span_status,
|
||||||
|
started_at=span_started_at,
|
||||||
|
duration=int((time.perf_counter() - span_started) * 1000),
|
||||||
|
message_id=(query.variables or {}).get('_monitoring_message_id'),
|
||||||
|
session_id=self._query_session_id(query),
|
||||||
|
bot_id=query.bot_uuid,
|
||||||
|
pipeline_id=self.pipeline_entity.uuid,
|
||||||
|
attributes={
|
||||||
|
'stage_class': stage_container.inst.__class__.__name__,
|
||||||
|
'result_type': span_result_type,
|
||||||
|
'query_id': query.query_id,
|
||||||
|
},
|
||||||
|
error_message=span_error,
|
||||||
|
)
|
||||||
|
except Exception as monitor_err:
|
||||||
|
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
|
||||||
|
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
async def process_query(self, query: pipeline_query.Query):
|
async def process_query(self, query: pipeline_query.Query):
|
||||||
"""处理请求"""
|
"""处理请求"""
|
||||||
|
trace_started_at = self._utc_now()
|
||||||
|
trace_started = time.perf_counter()
|
||||||
|
root_span_id = self._new_span_id()
|
||||||
|
trace_id = None
|
||||||
|
trace_status = 'success'
|
||||||
# Get monitoring metadata
|
# Get monitoring metadata
|
||||||
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
|
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
|
||||||
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
|
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
|
||||||
@@ -303,6 +377,28 @@ class RuntimePipeline:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.ap.logger.error(f'Failed to record query start: {e}')
|
self.ap.logger.error(f'Failed to record query start: {e}')
|
||||||
|
|
||||||
|
try:
|
||||||
|
trace_id = await self.ap.monitoring_service.start_trace(
|
||||||
|
name='LangBot query',
|
||||||
|
bot_id=query.bot_uuid or 'unknown',
|
||||||
|
bot_name=bot_name,
|
||||||
|
pipeline_id=self.pipeline_entity.uuid,
|
||||||
|
pipeline_name=pipeline_name,
|
||||||
|
session_id=self._query_session_id(query),
|
||||||
|
message_id=message_id or None,
|
||||||
|
query_id=query.query_id,
|
||||||
|
attributes={
|
||||||
|
'launcher_type': query.launcher_type.value
|
||||||
|
if hasattr(query.launcher_type, 'value')
|
||||||
|
else str(query.launcher_type),
|
||||||
|
'runner_name': runner_name,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
query.variables['_monitoring_trace_id'] = trace_id
|
||||||
|
query.variables['_monitoring_root_span_id'] = root_span_id
|
||||||
|
except Exception as e:
|
||||||
|
self.ap.logger.error(f'Failed to start query trace: {e}')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Get bound plugins for this pipeline
|
# Get bound plugins for this pipeline
|
||||||
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||||
@@ -336,7 +432,10 @@ class RuntimePipeline:
|
|||||||
await self._execute_from_stage(0, query)
|
await self._execute_from_stage(0, query)
|
||||||
|
|
||||||
# Record query success only if no error occurred during processing
|
# Record query success only if no error occurred during processing
|
||||||
if not query.variables.get('_monitoring_has_error', False):
|
has_monitoring_error = query.variables.get('_monitoring_has_error', False)
|
||||||
|
if has_monitoring_error:
|
||||||
|
trace_status = 'error'
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
await monitoring_helper.MonitoringHelper.record_query_success(
|
await monitoring_helper.MonitoringHelper.record_query_success(
|
||||||
ap=self.ap,
|
ap=self.ap,
|
||||||
@@ -361,6 +460,7 @@ class RuntimePipeline:
|
|||||||
self.ap.logger.error(f'Failed to record query response: {e}')
|
self.ap.logger.error(f'Failed to record query response: {e}')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
trace_status = 'error'
|
||||||
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
|
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
|
||||||
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
|
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
|
||||||
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
||||||
@@ -383,6 +483,35 @@ class RuntimePipeline:
|
|||||||
self.ap.logger.error(f'Failed to record query error: {me}')
|
self.ap.logger.error(f'Failed to record query error: {me}')
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
if trace_id:
|
||||||
|
try:
|
||||||
|
duration_ms = int((time.perf_counter() - trace_started) * 1000)
|
||||||
|
await self.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
span_id=root_span_id,
|
||||||
|
name='LangBot query',
|
||||||
|
kind='pipeline.query',
|
||||||
|
status=trace_status,
|
||||||
|
started_at=trace_started_at,
|
||||||
|
duration=duration_ms,
|
||||||
|
message_id=message_id or None,
|
||||||
|
session_id=self._query_session_id(query),
|
||||||
|
bot_id=query.bot_uuid,
|
||||||
|
pipeline_id=self.pipeline_entity.uuid,
|
||||||
|
attributes={
|
||||||
|
'query_id': query.query_id,
|
||||||
|
'pipeline_name': pipeline_name,
|
||||||
|
'runner_name': runner_name,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await self.ap.monitoring_service.finish_trace(
|
||||||
|
trace_id=trace_id,
|
||||||
|
status=trace_status,
|
||||||
|
duration=duration_ms,
|
||||||
|
message_id=message_id or None,
|
||||||
|
)
|
||||||
|
except Exception as monitor_err:
|
||||||
|
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
|
||||||
self.ap.logger.debug(f'Query {query.query_id} processed')
|
self.ap.logger.debug(f'Query {query.query_id} processed')
|
||||||
del self.ap.query_pool.cached_queries[query.query_id]
|
del self.ap.query_pool.cached_queries[query.query_id]
|
||||||
|
|
||||||
|
|||||||
@@ -711,8 +711,19 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
|
|||||||
endpoint: str,
|
endpoint: str,
|
||||||
method: str,
|
method: str,
|
||||||
body: Any = None,
|
body: Any = None,
|
||||||
|
caller: dict[str, Any] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body)
|
return await self.handler.handle_page_api(
|
||||||
|
plugin_author,
|
||||||
|
plugin_name,
|
||||||
|
page_id,
|
||||||
|
endpoint,
|
||||||
|
method,
|
||||||
|
body,
|
||||||
|
caller,
|
||||||
|
headers or {},
|
||||||
|
)
|
||||||
|
|
||||||
async def get_debug_info(self) -> dict[str, Any]:
|
async def get_debug_info(self) -> dict[str, Any]:
|
||||||
"""Get debug information including debug key and WS URL"""
|
"""Get debug information including debug key and WS URL"""
|
||||||
|
|||||||
@@ -755,6 +755,21 @@ class RuntimeConnectionHandler(handler.Handler):
|
|||||||
'session_name': session_name,
|
'session_name': session_name,
|
||||||
'bot_uuid': query.bot_uuid or '',
|
'bot_uuid': query.bot_uuid or '',
|
||||||
'sender_id': str(query.sender_id),
|
'sender_id': str(query.sender_id),
|
||||||
|
'_trace_context': {
|
||||||
|
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||||
|
'parent_span_id': query.variables.get('_monitoring_root_span_id')
|
||||||
|
if query.variables
|
||||||
|
else None,
|
||||||
|
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||||
|
'query_id': query.query_id,
|
||||||
|
'session_id': session_name,
|
||||||
|
'bot_id': query.bot_uuid or '',
|
||||||
|
'pipeline_id': query.pipeline_uuid or '',
|
||||||
|
'knowledge_base_id': kb_id,
|
||||||
|
'attributes': {
|
||||||
|
'source': 'plugin-api',
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
results = [entry.model_dump(mode='json') for entry in entries]
|
results = [entry.model_dump(mode='json') for entry in entries]
|
||||||
@@ -1011,6 +1026,8 @@ class RuntimeConnectionHandler(handler.Handler):
|
|||||||
endpoint: str,
|
endpoint: str,
|
||||||
method: str,
|
method: str,
|
||||||
body: Any = None,
|
body: Any = None,
|
||||||
|
caller: dict[str, Any] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Forward a page API call to the plugin via runtime."""
|
"""Forward a page API call to the plugin via runtime."""
|
||||||
result = await self.call_action(
|
result = await self.call_action(
|
||||||
@@ -1022,6 +1039,8 @@ class RuntimeConnectionHandler(handler.Handler):
|
|||||||
'endpoint': endpoint,
|
'endpoint': endpoint,
|
||||||
'method': method,
|
'method': method,
|
||||||
'body': body,
|
'body': body,
|
||||||
|
'caller': caller,
|
||||||
|
'headers': headers or {},
|
||||||
},
|
},
|
||||||
timeout=30,
|
timeout=30,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
|||||||
import abc
|
import abc
|
||||||
import typing
|
import typing
|
||||||
import time
|
import time
|
||||||
|
import datetime
|
||||||
|
|
||||||
from ...core import app
|
from ...core import app
|
||||||
from ...entity.persistence import model as persistence_model
|
from ...entity.persistence import model as persistence_model
|
||||||
@@ -16,6 +17,15 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
|
|||||||
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
|
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
|
||||||
|
|
||||||
|
|
||||||
|
def _utc_now() -> datetime.datetime:
|
||||||
|
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||||
|
|
||||||
|
|
||||||
|
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||||
|
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||||
|
return f'{launcher_type}_{query.launcher_id}'
|
||||||
|
|
||||||
|
|
||||||
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
|
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
|
||||||
"""Store the latest provider usage on the query for upstream action handlers."""
|
"""Store the latest provider usage on the query for upstream action handlers."""
|
||||||
if query is None or not usage_info:
|
if query is None or not usage_info:
|
||||||
@@ -59,6 +69,7 @@ class RuntimeProvider:
|
|||||||
"""Bridge method for invoking LLM with monitoring"""
|
"""Bridge method for invoking LLM with monitoring"""
|
||||||
# Start timing for monitoring
|
# Start timing for monitoring
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
span_started_at = _utc_now()
|
||||||
input_tokens = 0
|
input_tokens = 0
|
||||||
output_tokens = 0
|
output_tokens = 0
|
||||||
status = 'success'
|
status = 'success'
|
||||||
@@ -125,6 +136,30 @@ class RuntimeProvider:
|
|||||||
error_message=error_message,
|
error_message=error_message,
|
||||||
message_id=message_id,
|
message_id=message_id,
|
||||||
)
|
)
|
||||||
|
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||||
|
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||||
|
if trace_id:
|
||||||
|
await self.requester.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
parent_span_id=parent_span_id,
|
||||||
|
name=f'LLM {model.model_entity.name}',
|
||||||
|
kind='model.llm',
|
||||||
|
status=status,
|
||||||
|
started_at=span_started_at,
|
||||||
|
duration=duration_ms,
|
||||||
|
message_id=message_id,
|
||||||
|
session_id=_query_session_id(query),
|
||||||
|
bot_id=query.bot_uuid,
|
||||||
|
pipeline_id=query.pipeline_uuid,
|
||||||
|
attributes={
|
||||||
|
'model_name': model.model_entity.name,
|
||||||
|
'input_tokens': input_tokens,
|
||||||
|
'output_tokens': output_tokens,
|
||||||
|
'total_tokens': input_tokens + output_tokens,
|
||||||
|
'stream': False,
|
||||||
|
},
|
||||||
|
error_message=error_message,
|
||||||
|
)
|
||||||
except Exception as monitor_err:
|
except Exception as monitor_err:
|
||||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
|
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
|
||||||
|
|
||||||
@@ -140,6 +175,7 @@ class RuntimeProvider:
|
|||||||
"""Bridge method for invoking LLM stream with monitoring"""
|
"""Bridge method for invoking LLM stream with monitoring"""
|
||||||
# Start timing for monitoring
|
# Start timing for monitoring
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
span_started_at = _utc_now()
|
||||||
status = 'success'
|
status = 'success'
|
||||||
error_message = None
|
error_message = None
|
||||||
input_tokens = 0
|
input_tokens = 0
|
||||||
@@ -204,6 +240,30 @@ class RuntimeProvider:
|
|||||||
error_message=error_message,
|
error_message=error_message,
|
||||||
message_id=message_id,
|
message_id=message_id,
|
||||||
)
|
)
|
||||||
|
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||||
|
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||||
|
if trace_id:
|
||||||
|
await self.requester.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
parent_span_id=parent_span_id,
|
||||||
|
name=f'LLM stream {model.model_entity.name}',
|
||||||
|
kind='model.llm',
|
||||||
|
status=status,
|
||||||
|
started_at=span_started_at,
|
||||||
|
duration=duration_ms,
|
||||||
|
message_id=message_id,
|
||||||
|
session_id=_query_session_id(query),
|
||||||
|
bot_id=query.bot_uuid,
|
||||||
|
pipeline_id=query.pipeline_uuid,
|
||||||
|
attributes={
|
||||||
|
'model_name': model.model_entity.name,
|
||||||
|
'input_tokens': input_tokens,
|
||||||
|
'output_tokens': output_tokens,
|
||||||
|
'total_tokens': input_tokens + output_tokens,
|
||||||
|
'stream': True,
|
||||||
|
},
|
||||||
|
error_message=error_message,
|
||||||
|
)
|
||||||
except Exception as monitor_err:
|
except Exception as monitor_err:
|
||||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
|
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
|
||||||
|
|
||||||
|
|||||||
@@ -268,6 +268,21 @@ class LocalAgentRunner(runner.RequestRunner):
|
|||||||
'bot_uuid': query.bot_uuid or '',
|
'bot_uuid': query.bot_uuid or '',
|
||||||
'sender_id': str(query.sender_id),
|
'sender_id': str(query.sender_id),
|
||||||
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||||
|
'_trace_context': {
|
||||||
|
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||||
|
'parent_span_id': query.variables.get('_monitoring_root_span_id')
|
||||||
|
if query.variables
|
||||||
|
else None,
|
||||||
|
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||||
|
'query_id': query.query_id,
|
||||||
|
'session_id': f'{query.launcher_type.value}_{query.launcher_id}',
|
||||||
|
'bot_id': query.bot_uuid or '',
|
||||||
|
'pipeline_id': query.pipeline_uuid or '',
|
||||||
|
'knowledge_base_id': kb_uuid,
|
||||||
|
'attributes': {
|
||||||
|
'source': 'local-agent',
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import os.path
|
import os.path
|
||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
import zipfile
|
import zipfile
|
||||||
import io
|
import io
|
||||||
|
import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from langbot.pkg.core import app
|
from langbot.pkg.core import app
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
@@ -25,6 +27,10 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
|||||||
super().__init__(ap)
|
super().__init__(ap)
|
||||||
self.knowledge_base_entity = knowledge_base_entity
|
self.knowledge_base_entity = knowledge_base_entity
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _utc_now() -> datetime.datetime:
|
||||||
|
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -334,6 +340,25 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
|||||||
# are passed directly to vector_search by some plugins (e.g. LangRAG)
|
# are passed directly to vector_search by some plugins (e.g. LangRAG)
|
||||||
# and would cause empty results when the metadata field doesn't exist.
|
# and would cause empty results when the metadata field doesn't exist.
|
||||||
filters = settings.pop('filters', {})
|
filters = settings.pop('filters', {})
|
||||||
|
trace_context = settings.pop('_trace_context', None)
|
||||||
|
host_span_started_at = self._utc_now()
|
||||||
|
host_span_started = time.perf_counter()
|
||||||
|
host_span_id = None
|
||||||
|
if trace_context and trace_context.get('trace_id'):
|
||||||
|
host_parent_span_id = trace_context.get('parent_span_id')
|
||||||
|
host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}'
|
||||||
|
trace_context = {
|
||||||
|
'trace_id': trace_context.get('trace_id'),
|
||||||
|
'parent_span_id': host_span_id,
|
||||||
|
'host_parent_span_id': host_parent_span_id,
|
||||||
|
'message_id': trace_context.get('message_id'),
|
||||||
|
'query_id': trace_context.get('query_id'),
|
||||||
|
'session_id': trace_context.get('session_id'),
|
||||||
|
'bot_id': trace_context.get('bot_id'),
|
||||||
|
'pipeline_id': trace_context.get('pipeline_id'),
|
||||||
|
'knowledge_base_id': kb.uuid,
|
||||||
|
'attributes': trace_context.get('attributes') or {},
|
||||||
|
}
|
||||||
|
|
||||||
retrieval_context = {
|
retrieval_context = {
|
||||||
'query': query,
|
'query': query,
|
||||||
@@ -343,13 +368,107 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
|||||||
'creation_settings': kb.creation_settings or {},
|
'creation_settings': kb.creation_settings or {},
|
||||||
'filters': filters,
|
'filters': filters,
|
||||||
}
|
}
|
||||||
|
if trace_context:
|
||||||
|
retrieval_context['trace_context'] = trace_context
|
||||||
|
|
||||||
|
try:
|
||||||
result = await self.ap.plugin_connector.call_rag_retrieve(
|
result = await self.ap.plugin_connector.call_rag_retrieve(
|
||||||
plugin_id,
|
plugin_id,
|
||||||
retrieval_context,
|
retrieval_context,
|
||||||
)
|
)
|
||||||
|
except Exception as e:
|
||||||
|
if trace_context:
|
||||||
|
await self._record_rag_trace_result(
|
||||||
|
trace_context=trace_context,
|
||||||
|
host_span_id=host_span_id,
|
||||||
|
started_at=host_span_started_at,
|
||||||
|
duration=int((time.perf_counter() - host_span_started) * 1000),
|
||||||
|
plugin_id=plugin_id,
|
||||||
|
result={
|
||||||
|
'results': [],
|
||||||
|
'metadata': {
|
||||||
|
'status': 'error',
|
||||||
|
'error_message': str(e),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
if trace_context:
|
||||||
|
await self._record_rag_trace_result(
|
||||||
|
trace_context=trace_context,
|
||||||
|
host_span_id=host_span_id,
|
||||||
|
started_at=host_span_started_at,
|
||||||
|
duration=int((time.perf_counter() - host_span_started) * 1000),
|
||||||
|
plugin_id=plugin_id,
|
||||||
|
result=result,
|
||||||
|
)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
async def _record_rag_trace_result(
|
||||||
|
self,
|
||||||
|
trace_context: dict[str, Any],
|
||||||
|
host_span_id: str | None,
|
||||||
|
started_at: datetime.datetime,
|
||||||
|
duration: int,
|
||||||
|
plugin_id: str,
|
||||||
|
result: dict[str, Any],
|
||||||
|
) -> None:
|
||||||
|
"""Persist host RAG span and plugin-provided child spans."""
|
||||||
|
trace_id = trace_context.get('trace_id')
|
||||||
|
if not trace_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
metadata = result.get('metadata') if isinstance(result, dict) else {}
|
||||||
|
metadata = metadata if isinstance(metadata, dict) else {}
|
||||||
|
plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else []
|
||||||
|
parent_span_id = trace_context.get('parent_span_id')
|
||||||
|
host_parent_span_id = trace_context.get('host_parent_span_id')
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
span_id=host_span_id,
|
||||||
|
parent_span_id=host_parent_span_id,
|
||||||
|
name=f'Knowledge retrieval {self.knowledge_base_entity.name}',
|
||||||
|
kind='rag.retrieval',
|
||||||
|
status=metadata.get('status', 'success'),
|
||||||
|
started_at=started_at,
|
||||||
|
duration=duration,
|
||||||
|
message_id=trace_context.get('message_id'),
|
||||||
|
session_id=trace_context.get('session_id'),
|
||||||
|
bot_id=trace_context.get('bot_id'),
|
||||||
|
pipeline_id=trace_context.get('pipeline_id'),
|
||||||
|
attributes={
|
||||||
|
'knowledge_base_id': self.knowledge_base_entity.uuid,
|
||||||
|
'knowledge_base_name': self.knowledge_base_entity.name,
|
||||||
|
'plugin_id': plugin_id,
|
||||||
|
'returned_count': len(result.get('results', []) if isinstance(result, dict) else []),
|
||||||
|
'total_found': result.get('total_found') if isinstance(result, dict) else None,
|
||||||
|
},
|
||||||
|
error_message=metadata.get('error_message'),
|
||||||
|
)
|
||||||
|
for span in plugin_spans:
|
||||||
|
if not isinstance(span, dict):
|
||||||
|
continue
|
||||||
|
await self.ap.monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
span_id=span.get('span_id'),
|
||||||
|
parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id,
|
||||||
|
name=span.get('name') or 'RAG plugin stage',
|
||||||
|
kind=span.get('kind') or 'rag.stage',
|
||||||
|
status=span.get('status') or 'success',
|
||||||
|
started_at=started_at,
|
||||||
|
duration=span.get('duration_ms'),
|
||||||
|
message_id=trace_context.get('message_id'),
|
||||||
|
session_id=trace_context.get('session_id'),
|
||||||
|
bot_id=trace_context.get('bot_id'),
|
||||||
|
pipeline_id=trace_context.get('pipeline_id'),
|
||||||
|
attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {},
|
||||||
|
error_message=span.get('error_message'),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.ap.logger.error(f'Failed to record RAG trace spans: {e}')
|
||||||
|
|
||||||
async def _delete_document(self, document_id: str) -> bool:
|
async def _delete_document(self, document_id: str) -> bool:
|
||||||
"""Call plugin to delete document."""
|
"""Call plugin to delete document."""
|
||||||
kb = self.knowledge_base_entity
|
kb = self.knowledge_base_entity
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ Run: uv run pytest tests/integration/api/test_monitoring.py -q
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
import pytest
|
import pytest
|
||||||
from unittest.mock import MagicMock, AsyncMock, Mock
|
from unittest.mock import MagicMock, AsyncMock, Mock
|
||||||
|
|
||||||
@@ -82,6 +83,15 @@ def fake_monitoring_app():
|
|||||||
app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100))
|
app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100))
|
||||||
app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50))
|
app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50))
|
||||||
app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10))
|
app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10))
|
||||||
|
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
|
||||||
|
app.monitoring_service.get_trace_details = AsyncMock(
|
||||||
|
side_effect=lambda trace_id: {
|
||||||
|
'found': trace_id == 'trace-1',
|
||||||
|
'trace_id': trace_id,
|
||||||
|
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
|
||||||
|
'spans': [] if trace_id == 'trace-1' else None,
|
||||||
|
}
|
||||||
|
)
|
||||||
app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20))
|
app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20))
|
||||||
app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2))
|
app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2))
|
||||||
app.monitoring_service.get_session_analysis = AsyncMock(
|
app.monitoring_service.get_session_analysis = AsyncMock(
|
||||||
@@ -222,6 +232,7 @@ class TestMonitoringAllDataEndpoint:
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = await response.get_json()
|
data = await response.get_json()
|
||||||
assert 'overview' in data['data']
|
assert 'overview' in data['data']
|
||||||
|
assert 'traces' in data['data']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures('mock_circular_import_chain')
|
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||||
@@ -246,6 +257,60 @@ class TestMonitoringDetailsEndpoints:
|
|||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_trace_details(self, quart_test_client):
|
||||||
|
"""GET /api/v1/monitoring/traces/{id}."""
|
||||||
|
response = await quart_test_client.get(
|
||||||
|
'/api/v1/monitoring/traces/trace-1', headers={'Authorization': 'Bearer test_token'}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||||
|
class TestMonitoringTraceEndpoints:
|
||||||
|
"""Tests for trace list and detail endpoints."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_traces_forwards_filters(self, quart_test_client, fake_monitoring_app):
|
||||||
|
"""GET /api/v1/monitoring/traces forwards filters to service."""
|
||||||
|
response = await quart_test_client.get(
|
||||||
|
'/api/v1/monitoring/traces'
|
||||||
|
'?botId=bot-1'
|
||||||
|
'&pipelineId=pipeline-1'
|
||||||
|
'&sessionId=session-1'
|
||||||
|
'&status=success'
|
||||||
|
'&startTime=2026-01-01T00:00:00Z'
|
||||||
|
'&endTime=2026-01-02T00:00:00Z'
|
||||||
|
'&limit=25'
|
||||||
|
'&offset=5',
|
||||||
|
headers={'Authorization': 'Bearer test_token'},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = await response.get_json()
|
||||||
|
assert data['data']['traces'] == [{'trace_id': 'trace-1'}]
|
||||||
|
assert data['data']['total'] == 1
|
||||||
|
fake_monitoring_app.monitoring_service.get_traces.assert_awaited_with(
|
||||||
|
bot_ids=['bot-1'],
|
||||||
|
pipeline_ids=['pipeline-1'],
|
||||||
|
session_ids=['session-1'],
|
||||||
|
statuses=['success'],
|
||||||
|
start_time=datetime.datetime(2026, 1, 1, 0, 0),
|
||||||
|
end_time=datetime.datetime(2026, 1, 2, 0, 0),
|
||||||
|
limit=25,
|
||||||
|
offset=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_trace_details_not_found(self, quart_test_client):
|
||||||
|
"""GET /api/v1/monitoring/traces/{id} returns 404 when missing."""
|
||||||
|
response = await quart_test_client.get(
|
||||||
|
'/api/v1/monitoring/traces/trace-missing', headers={'Authorization': 'Bearer test_token'}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures('mock_circular_import_chain')
|
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||||
class TestMonitoringFeedbackEndpoints:
|
class TestMonitoringFeedbackEndpoints:
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ class TestSQLiteMigrationUpgrade:
|
|||||||
rev = await get_alembic_current(sqlite_engine)
|
rev = await get_alembic_current(sqlite_engine)
|
||||||
assert rev is not None, 'Expected a revision after upgrade'
|
assert rev is not None, 'Expected a revision after upgrade'
|
||||||
# Head should be the latest migration
|
# Head should be the latest migration
|
||||||
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
|
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_upgrade_idempotent(self, sqlite_engine):
|
async def test_upgrade_idempotent(self, sqlite_engine):
|
||||||
|
|||||||
@@ -144,8 +144,8 @@ class TestPostgreSQLMigrationUpgrade:
|
|||||||
# Verify revision
|
# Verify revision
|
||||||
rev = await get_alembic_current(postgres_engine)
|
rev = await get_alembic_current(postgres_engine)
|
||||||
assert rev is not None, 'Expected a revision after upgrade'
|
assert rev is not None, 'Expected a revision after upgrade'
|
||||||
# Head should be the latest migration (0005 for current state)
|
# Head should be the latest migration.
|
||||||
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
|
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version):
|
async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version):
|
||||||
|
|||||||
@@ -0,0 +1,207 @@
|
|||||||
|
"""Unit tests for MonitoringService trace observability."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import AsyncMock, Mock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import sqlalchemy
|
||||||
|
from sqlalchemy.ext.asyncio import create_async_engine
|
||||||
|
|
||||||
|
from langbot.pkg.api.http.service.monitoring import MonitoringService
|
||||||
|
from langbot.pkg.entity.persistence.base import Base
|
||||||
|
from langbot.pkg.entity.persistence import monitoring as persistence_monitoring
|
||||||
|
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
class _SQLitePersistence:
|
||||||
|
def __init__(self, engine):
|
||||||
|
self._engine = engine
|
||||||
|
|
||||||
|
def get_db_engine(self):
|
||||||
|
return self._engine
|
||||||
|
|
||||||
|
async def execute_async(self, *args, **kwargs):
|
||||||
|
async with self._engine.connect() as conn:
|
||||||
|
result = await conn.execute(*args, **kwargs)
|
||||||
|
await conn.commit()
|
||||||
|
return result
|
||||||
|
|
||||||
|
def serialize_model(self, model, data, masked_columns=None):
|
||||||
|
masked_columns = masked_columns or []
|
||||||
|
return {
|
||||||
|
column.name: getattr(data, column.name).isoformat()
|
||||||
|
if isinstance(getattr(data, column.name), datetime.datetime)
|
||||||
|
else getattr(data, column.name)
|
||||||
|
for column in model.__table__.columns
|
||||||
|
if column.name not in masked_columns
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def monitoring_service(tmp_path):
|
||||||
|
engine = create_async_engine(f'sqlite+aiosqlite:///{tmp_path / "monitoring.db"}')
|
||||||
|
async with engine.begin() as conn:
|
||||||
|
await conn.run_sync(Base.metadata.create_all)
|
||||||
|
|
||||||
|
ap = SimpleNamespace(
|
||||||
|
persistence_mgr=_SQLitePersistence(engine),
|
||||||
|
instance_config=SimpleNamespace(data={'database': {'use': 'sqlite'}}),
|
||||||
|
logger=Mock(),
|
||||||
|
)
|
||||||
|
service = MonitoringService(ap)
|
||||||
|
yield service
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_trace_lifecycle_records_spans_and_returns_details(monitoring_service):
|
||||||
|
started_at = datetime.datetime(2026, 1, 1, 12, 0, 0)
|
||||||
|
ended_at = started_at + datetime.timedelta(milliseconds=125)
|
||||||
|
|
||||||
|
trace_id = await monitoring_service.start_trace(
|
||||||
|
trace_id='trace-test',
|
||||||
|
name='Pipeline query',
|
||||||
|
bot_id='bot-1',
|
||||||
|
bot_name='Bot',
|
||||||
|
pipeline_id='pipeline-1',
|
||||||
|
pipeline_name='Default',
|
||||||
|
session_id='session-1',
|
||||||
|
message_id='message-1',
|
||||||
|
query_id=42,
|
||||||
|
attributes={'source': 'unit-test'},
|
||||||
|
)
|
||||||
|
assert trace_id == 'trace-test'
|
||||||
|
|
||||||
|
root_span_id = await monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
span_id='span-root',
|
||||||
|
name='Pipeline',
|
||||||
|
kind='pipeline',
|
||||||
|
status='completed',
|
||||||
|
started_at=started_at,
|
||||||
|
ended_at=ended_at,
|
||||||
|
message_id='message-1',
|
||||||
|
session_id='session-1',
|
||||||
|
bot_id='bot-1',
|
||||||
|
pipeline_id='pipeline-1',
|
||||||
|
attributes={'stage_count': 2},
|
||||||
|
)
|
||||||
|
await monitoring_service.record_span(
|
||||||
|
trace_id=trace_id,
|
||||||
|
span_id='span-rag',
|
||||||
|
parent_span_id=root_span_id,
|
||||||
|
name='RAG retrieval',
|
||||||
|
kind='rag.retrieval',
|
||||||
|
status='failed',
|
||||||
|
started_at=started_at + datetime.timedelta(seconds=1),
|
||||||
|
duration=12.7,
|
||||||
|
attributes={'top_k': 5},
|
||||||
|
error_message='vector store timeout',
|
||||||
|
)
|
||||||
|
await monitoring_service.finish_trace(
|
||||||
|
trace_id,
|
||||||
|
status='completed',
|
||||||
|
duration=250,
|
||||||
|
message_id='message-final',
|
||||||
|
attributes={'result_type': 'reply'},
|
||||||
|
)
|
||||||
|
|
||||||
|
traces, total = await monitoring_service.get_traces(
|
||||||
|
bot_ids=['bot-1'],
|
||||||
|
pipeline_ids=['pipeline-1'],
|
||||||
|
session_ids=['session-1'],
|
||||||
|
statuses=['success'],
|
||||||
|
limit=10,
|
||||||
|
offset=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert total == 1
|
||||||
|
assert traces[0]['trace_id'] == trace_id
|
||||||
|
assert traces[0]['status'] == 'success'
|
||||||
|
assert traces[0]['message_id'] == 'message-final'
|
||||||
|
assert traces[0]['query_id'] == '42'
|
||||||
|
assert traces[0]['attributes'] == {'result_type': 'reply'}
|
||||||
|
|
||||||
|
details = await monitoring_service.get_trace_details(trace_id)
|
||||||
|
assert details['found'] is True
|
||||||
|
assert details['trace']['trace_id'] == trace_id
|
||||||
|
assert [span['span_id'] for span in details['spans']] == ['span-root', 'span-rag']
|
||||||
|
assert details['spans'][0]['status'] == 'success'
|
||||||
|
assert details['spans'][0]['duration'] == 125
|
||||||
|
assert details['spans'][0]['attributes'] == {'stage_count': 2}
|
||||||
|
assert details['spans'][1]['status'] == 'error'
|
||||||
|
assert details['spans'][1]['duration'] == 13
|
||||||
|
assert details['spans'][1]['parent_span_id'] == 'span-root'
|
||||||
|
assert details['spans'][1]['error_message'] == 'vector store timeout'
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_trace_details_returns_not_found_for_missing_trace(monitoring_service):
|
||||||
|
details = await monitoring_service.get_trace_details('trace-missing')
|
||||||
|
|
||||||
|
assert details == {'trace_id': 'trace-missing', 'found': False}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cleanup_expired_records_includes_traces_and_spans(monitoring_service):
|
||||||
|
old_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(days=30)
|
||||||
|
recent_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||||
|
|
||||||
|
await monitoring_service.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.insert(persistence_monitoring.MonitoringTrace),
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'trace_id': 'trace-old',
|
||||||
|
'started_at': old_time,
|
||||||
|
'ended_at': old_time,
|
||||||
|
'duration': 10,
|
||||||
|
'status': 'success',
|
||||||
|
'name': 'Old trace',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'trace_id': 'trace-recent',
|
||||||
|
'started_at': recent_time,
|
||||||
|
'ended_at': recent_time,
|
||||||
|
'duration': 10,
|
||||||
|
'status': 'success',
|
||||||
|
'name': 'Recent trace',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
await monitoring_service.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.insert(persistence_monitoring.MonitoringSpan),
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'span_id': 'span-old',
|
||||||
|
'trace_id': 'trace-old',
|
||||||
|
'name': 'Old span',
|
||||||
|
'kind': 'pipeline',
|
||||||
|
'status': 'success',
|
||||||
|
'started_at': old_time,
|
||||||
|
'ended_at': old_time,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'span_id': 'span-recent',
|
||||||
|
'trace_id': 'trace-recent',
|
||||||
|
'name': 'Recent span',
|
||||||
|
'kind': 'pipeline',
|
||||||
|
'status': 'success',
|
||||||
|
'started_at': recent_time,
|
||||||
|
'ended_at': recent_time,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
monitoring_service._release_sqlite_space = AsyncMock()
|
||||||
|
|
||||||
|
deleted = await monitoring_service.cleanup_expired_records(retention_days=7, batch_size=1)
|
||||||
|
|
||||||
|
assert deleted['monitoring_traces'] == 1
|
||||||
|
assert deleted['monitoring_spans'] == 1
|
||||||
|
monitoring_service._release_sqlite_space.assert_awaited_once()
|
||||||
|
|
||||||
|
remaining = await monitoring_service.get_trace_details('trace-recent')
|
||||||
|
assert remaining['found'] is True
|
||||||
|
assert remaining['spans'][0]['span_id'] == 'span-recent'
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
"""Unit tests for monitoring trace HTTP routes."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from unittest.mock import AsyncMock, Mock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import quart
|
||||||
|
|
||||||
|
from tests.factories import FakeApp
|
||||||
|
from tests.utils.import_isolation import MockLifecycleControlScope, isolated_sys_modules
|
||||||
|
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def monitoring_client():
|
||||||
|
mock_app = Mock()
|
||||||
|
mock_app.Application = type('FakeMinimalApplication', (), {})
|
||||||
|
mock_entities = Mock()
|
||||||
|
mock_entities.LifecycleControlScope = MockLifecycleControlScope
|
||||||
|
|
||||||
|
clear = [
|
||||||
|
'langbot.pkg.api.http.controller.group',
|
||||||
|
'langbot.pkg.api.http.controller.groups',
|
||||||
|
'langbot.pkg.api.http.controller.groups.monitoring',
|
||||||
|
'langbot.pkg.api.http.controller.main',
|
||||||
|
]
|
||||||
|
|
||||||
|
app = FakeApp()
|
||||||
|
app.user_service = Mock()
|
||||||
|
app.user_service.verify_jwt_token = AsyncMock(return_value='test@example.com')
|
||||||
|
app.user_service.get_user_by_email = AsyncMock(return_value=Mock(email='test@example.com'))
|
||||||
|
|
||||||
|
app.monitoring_service = Mock()
|
||||||
|
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
|
||||||
|
app.monitoring_service.get_trace_details = AsyncMock(
|
||||||
|
side_effect=lambda trace_id: {
|
||||||
|
'found': trace_id == 'trace-1',
|
||||||
|
'trace_id': trace_id,
|
||||||
|
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
|
||||||
|
'spans': [] if trace_id == 'trace-1' else None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
with isolated_sys_modules(
|
||||||
|
mocks={
|
||||||
|
'langbot.pkg.core.app': mock_app,
|
||||||
|
'langbot.pkg.core.entities': mock_entities,
|
||||||
|
},
|
||||||
|
clear=clear,
|
||||||
|
):
|
||||||
|
from langbot.pkg.api.http.controller.groups.monitoring import MonitoringRouterGroup
|
||||||
|
|
||||||
|
quart_app = quart.Quart(__name__)
|
||||||
|
group = MonitoringRouterGroup(app, quart_app)
|
||||||
|
await group.initialize()
|
||||||
|
|
||||||
|
yield app, quart_app.test_client()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_traces_route_forwards_filters(monitoring_client):
|
||||||
|
app, client = monitoring_client
|
||||||
|
|
||||||
|
response = await client.get(
|
||||||
|
'/api/v1/monitoring/traces'
|
||||||
|
'?botId=bot-1'
|
||||||
|
'&pipelineId=pipeline-1'
|
||||||
|
'&sessionId=session-1'
|
||||||
|
'&status=success'
|
||||||
|
'&startTime=2026-01-01T00:00:00Z'
|
||||||
|
'&endTime=2026-01-02T00:00:00Z'
|
||||||
|
'&limit=25'
|
||||||
|
'&offset=5',
|
||||||
|
headers={'Authorization': 'Bearer test_token'},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = await response.get_json()
|
||||||
|
assert data['data'] == {
|
||||||
|
'traces': [{'trace_id': 'trace-1'}],
|
||||||
|
'total': 1,
|
||||||
|
'limit': 25,
|
||||||
|
'offset': 5,
|
||||||
|
}
|
||||||
|
app.monitoring_service.get_traces.assert_awaited_once_with(
|
||||||
|
bot_ids=['bot-1'],
|
||||||
|
pipeline_ids=['pipeline-1'],
|
||||||
|
session_ids=['session-1'],
|
||||||
|
statuses=['success'],
|
||||||
|
start_time=datetime.datetime(2026, 1, 1, 0, 0),
|
||||||
|
end_time=datetime.datetime(2026, 1, 2, 0, 0),
|
||||||
|
limit=25,
|
||||||
|
offset=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_trace_details_route_returns_404_for_missing_trace(monitoring_client):
|
||||||
|
_app, client = monitoring_client
|
||||||
|
|
||||||
|
response = await client.get(
|
||||||
|
'/api/v1/monitoring/traces/trace-missing',
|
||||||
|
headers={'Authorization': 'Bearer test_token'},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 404
|
||||||
|
data = await response.get_json()
|
||||||
|
assert data['code'] == -1
|
||||||
|
assert data['msg'] == 'Trace trace-missing not found'
|
||||||
@@ -0,0 +1,87 @@
|
|||||||
|
"""Unit tests for the monitoring trace Alembic migration."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from importlib import import_module
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeInspector:
|
||||||
|
def __init__(self, tables):
|
||||||
|
self._tables = tables
|
||||||
|
|
||||||
|
def get_table_names(self):
|
||||||
|
return list(self._tables)
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeOp:
|
||||||
|
def __init__(self):
|
||||||
|
self.created_tables = []
|
||||||
|
self.created_indexes = []
|
||||||
|
self.dropped_tables = []
|
||||||
|
|
||||||
|
def get_bind(self):
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def create_table(self, table_name, *columns):
|
||||||
|
self.created_tables.append((table_name, columns))
|
||||||
|
|
||||||
|
def create_index(self, index_name, table_name, columns):
|
||||||
|
self.created_indexes.append((index_name, table_name, columns))
|
||||||
|
|
||||||
|
def drop_table(self, table_name):
|
||||||
|
self.dropped_tables.append(table_name)
|
||||||
|
|
||||||
|
|
||||||
|
def _migration_module():
|
||||||
|
return import_module('langbot.pkg.persistence.alembic.versions.0006_monitoring_traces')
|
||||||
|
|
||||||
|
|
||||||
|
def test_upgrade_creates_monitoring_trace_tables_and_indexes(monkeypatch):
|
||||||
|
migration = _migration_module()
|
||||||
|
fake_op = _FakeOp()
|
||||||
|
|
||||||
|
monkeypatch.setattr(migration, 'op', fake_op)
|
||||||
|
monkeypatch.setattr(migration.sa, 'inspect', lambda _conn: _FakeInspector(tables=set()))
|
||||||
|
|
||||||
|
migration.upgrade()
|
||||||
|
|
||||||
|
assert [table_name for table_name, _columns in fake_op.created_tables] == [
|
||||||
|
'monitoring_traces',
|
||||||
|
'monitoring_spans',
|
||||||
|
]
|
||||||
|
assert ('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) in fake_op.created_indexes
|
||||||
|
assert ('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) in fake_op.created_indexes
|
||||||
|
assert ('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) in fake_op.created_indexes
|
||||||
|
|
||||||
|
|
||||||
|
def test_upgrade_skips_existing_monitoring_trace_tables(monkeypatch):
|
||||||
|
migration = _migration_module()
|
||||||
|
fake_op = _FakeOp()
|
||||||
|
|
||||||
|
monkeypatch.setattr(migration, 'op', fake_op)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
migration.sa,
|
||||||
|
'inspect',
|
||||||
|
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
|
||||||
|
)
|
||||||
|
|
||||||
|
migration.upgrade()
|
||||||
|
|
||||||
|
assert fake_op.created_tables == []
|
||||||
|
assert fake_op.created_indexes == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_downgrade_drops_spans_before_traces(monkeypatch):
|
||||||
|
migration = _migration_module()
|
||||||
|
fake_op = _FakeOp()
|
||||||
|
|
||||||
|
monkeypatch.setattr(migration, 'op', fake_op)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
migration.sa,
|
||||||
|
'inspect',
|
||||||
|
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
|
||||||
|
)
|
||||||
|
|
||||||
|
migration.downgrade()
|
||||||
|
|
||||||
|
assert fake_op.dropped_tables == ['monitoring_spans', 'monitoring_traces']
|
||||||
@@ -162,3 +162,61 @@ async def test_runtime_pipeline_execute(mock_app, sample_query):
|
|||||||
|
|
||||||
# Verify stage was called
|
# Verify stage was called
|
||||||
mock_stage.process.assert_called_once()
|
mock_stage.process.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_runtime_pipeline_marks_trace_error_when_stage_returns_error_notice(mock_app, sample_query):
|
||||||
|
"""Trace status follows handled stage errors, not only raised exceptions."""
|
||||||
|
pipelinemgr = get_pipelinemgr_module()
|
||||||
|
stage = get_stage_module()
|
||||||
|
persistence_pipeline = get_persistence_pipeline_module()
|
||||||
|
entities = get_entities_module()
|
||||||
|
|
||||||
|
error_result = entities.StageProcessResult(
|
||||||
|
result_type=entities.ResultType.INTERRUPT,
|
||||||
|
new_query=sample_query,
|
||||||
|
user_notice='',
|
||||||
|
console_notice='',
|
||||||
|
debug_notice='traceback',
|
||||||
|
error_notice='model request failed',
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_stage = Mock(spec=stage.PipelineStage)
|
||||||
|
mock_stage.process = AsyncMock(return_value=error_result)
|
||||||
|
stage_container = pipelinemgr.StageInstContainer(inst_name='FailingStage', inst=mock_stage)
|
||||||
|
|
||||||
|
pipeline_entity = Mock(spec=persistence_pipeline.LegacyPipeline)
|
||||||
|
pipeline_entity.uuid = 'test-pipeline-uuid'
|
||||||
|
pipeline_entity.name = 'Test Pipeline'
|
||||||
|
pipeline_entity.config = sample_query.pipeline_config
|
||||||
|
pipeline_entity.extensions_preferences = {'plugins': []}
|
||||||
|
|
||||||
|
mock_app.bot_service = AsyncMock()
|
||||||
|
mock_app.bot_service.get_bot = AsyncMock(return_value={'name': 'Test Bot'})
|
||||||
|
mock_app.monitoring_service = AsyncMock()
|
||||||
|
mock_app.monitoring_service.record_message = AsyncMock(return_value='message-1')
|
||||||
|
mock_app.monitoring_service.update_session_activity = AsyncMock(return_value=True)
|
||||||
|
mock_app.monitoring_service.start_trace = AsyncMock(return_value='trace-1')
|
||||||
|
mock_app.monitoring_service.record_span = AsyncMock()
|
||||||
|
mock_app.monitoring_service.finish_trace = AsyncMock()
|
||||||
|
mock_app.monitoring_service.update_message_status = AsyncMock()
|
||||||
|
mock_app.monitoring_service.record_error = AsyncMock()
|
||||||
|
|
||||||
|
event_ctx = Mock()
|
||||||
|
event_ctx.is_prevented_default = Mock(return_value=False)
|
||||||
|
mock_app.plugin_connector.emit_event = AsyncMock(return_value=event_ctx)
|
||||||
|
mock_app.query_pool.cached_queries[sample_query.query_id] = sample_query
|
||||||
|
|
||||||
|
runtime_pipeline = pipelinemgr.RuntimePipeline(mock_app, pipeline_entity, [stage_container])
|
||||||
|
|
||||||
|
await runtime_pipeline.run(sample_query)
|
||||||
|
|
||||||
|
mock_app.monitoring_service.finish_trace.assert_awaited_once()
|
||||||
|
assert mock_app.monitoring_service.finish_trace.await_args.kwargs['status'] == 'error'
|
||||||
|
|
||||||
|
span_calls = mock_app.monitoring_service.record_span.await_args_list
|
||||||
|
stage_span_call = next(call for call in span_calls if call.kwargs['name'] == 'FailingStage')
|
||||||
|
root_span_call = next(call for call in span_calls if call.kwargs['kind'] == 'pipeline.query')
|
||||||
|
assert stage_span_call.kwargs['status'] == 'error'
|
||||||
|
assert stage_span_call.kwargs['error_message'] == 'model request failed'
|
||||||
|
assert root_span_call.kwargs['status'] == 'error'
|
||||||
|
|||||||
@@ -407,6 +407,32 @@ class TestRuntimeKnowledgeBaseRetrieve:
|
|||||||
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
|
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
|
||||||
assert call_args[0][1]['retrieval_settings']['top_k'] == 5
|
assert call_args[0][1]['retrieval_settings']['top_k'] == 5
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retrieve_records_host_rag_duration(self, monkeypatch):
|
||||||
|
"""Test host RAG span duration is measured even if plugin omits it."""
|
||||||
|
rag_module = get_rag_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
mock_app.monitoring_service = AsyncMock()
|
||||||
|
mock_kb = create_mock_kb_entity()
|
||||||
|
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
|
||||||
|
return_value={'results': [], 'metadata': {'status': 'success'}}
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(rag_module.time, 'perf_counter', Mock(side_effect=[10.0, 10.25]))
|
||||||
|
|
||||||
|
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
|
||||||
|
|
||||||
|
await runtime_kb._retrieve(
|
||||||
|
'query text',
|
||||||
|
{
|
||||||
|
'_trace_context': {
|
||||||
|
'trace_id': 'trace-1',
|
||||||
|
'parent_span_id': 'span-root',
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert mock_app.monitoring_service.record_span.await_args.kwargs['duration'] == 250
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_retrieve_converts_dict_to_entry(self):
|
async def test_retrieve_converts_dict_to_entry(self):
|
||||||
"""Test that dict results are converted to RetrievalResultEntry."""
|
"""Test that dict results are converted to RetrievalResultEntry."""
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import {
|
|||||||
ModelCall,
|
ModelCall,
|
||||||
LLMCall,
|
LLMCall,
|
||||||
EmbeddingCall,
|
EmbeddingCall,
|
||||||
|
MonitoringTrace,
|
||||||
} from '../types/monitoring';
|
} from '../types/monitoring';
|
||||||
import { backendClient } from '@/app/infra/http';
|
import { backendClient } from '@/app/infra/http';
|
||||||
import { parseUTCTimestamp } from '../utils/dateUtils';
|
import { parseUTCTimestamp } from '../utils/dateUtils';
|
||||||
@@ -263,12 +264,48 @@ export function useMonitoringData(filterState: FilterState) {
|
|||||||
messageId: error.message_id,
|
messageId: error.message_id,
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
|
traces: (response.traces || []).map(
|
||||||
|
(trace: {
|
||||||
|
trace_id: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
status: string;
|
||||||
|
name: string;
|
||||||
|
bot_id?: string;
|
||||||
|
bot_name?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
pipeline_name?: string;
|
||||||
|
session_id?: string;
|
||||||
|
message_id?: string;
|
||||||
|
query_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
}): MonitoringTrace => ({
|
||||||
|
traceId: trace.trace_id,
|
||||||
|
name: trace.name,
|
||||||
|
startedAt: parseUTCTimestamp(trace.started_at),
|
||||||
|
endedAt: trace.ended_at
|
||||||
|
? parseUTCTimestamp(trace.ended_at)
|
||||||
|
: undefined,
|
||||||
|
duration: trace.duration,
|
||||||
|
status: trace.status as 'running' | 'success' | 'error',
|
||||||
|
botId: trace.bot_id,
|
||||||
|
botName: trace.bot_name,
|
||||||
|
pipelineId: trace.pipeline_id,
|
||||||
|
pipelineName: trace.pipeline_name,
|
||||||
|
sessionId: trace.session_id,
|
||||||
|
messageId: trace.message_id,
|
||||||
|
queryId: trace.query_id,
|
||||||
|
attributes: trace.attributes || {},
|
||||||
|
}),
|
||||||
|
),
|
||||||
totalCount: {
|
totalCount: {
|
||||||
messages: response.totalCount.messages,
|
messages: response.totalCount.messages,
|
||||||
llmCalls: response.totalCount.llmCalls,
|
llmCalls: response.totalCount.llmCalls,
|
||||||
embeddingCalls: response.totalCount.embeddingCalls || 0,
|
embeddingCalls: response.totalCount.embeddingCalls || 0,
|
||||||
sessions: response.totalCount.sessions,
|
sessions: response.totalCount.sessions,
|
||||||
errors: response.totalCount.errors,
|
errors: response.totalCount.errors,
|
||||||
|
traces: response.totalCount.traces || 0,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import {
|
|||||||
MessageSquare,
|
MessageSquare,
|
||||||
Sparkles,
|
Sparkles,
|
||||||
CheckCircle2,
|
CheckCircle2,
|
||||||
|
GitBranch,
|
||||||
} from 'lucide-react';
|
} from 'lucide-react';
|
||||||
import OverviewCards from './components/overview-cards/OverviewCards';
|
import OverviewCards from './components/overview-cards/OverviewCards';
|
||||||
import MonitoringFilters from './components/filters/MonitoringFilters';
|
import MonitoringFilters from './components/filters/MonitoringFilters';
|
||||||
@@ -22,9 +23,15 @@ import { MessageDetailsCard } from './components/MessageDetailsCard';
|
|||||||
import { MessageContentRenderer } from './components/MessageContentRenderer';
|
import { MessageContentRenderer } from './components/MessageContentRenderer';
|
||||||
import { FeedbackStatsCards } from './components/FeedbackCard';
|
import { FeedbackStatsCards } from './components/FeedbackCard';
|
||||||
import { FeedbackList } from './components/FeedbackList';
|
import { FeedbackList } from './components/FeedbackList';
|
||||||
import { MessageDetails } from './types/monitoring';
|
import {
|
||||||
|
MessageDetails,
|
||||||
|
TraceDetails,
|
||||||
|
MonitoringSpan,
|
||||||
|
} from './types/monitoring';
|
||||||
import { httpClient } from '@/app/infra/http/HttpClient';
|
import { httpClient } from '@/app/infra/http/HttpClient';
|
||||||
|
import { backendClient } from '@/app/infra/http';
|
||||||
import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner';
|
import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner';
|
||||||
|
import { parseUTCTimestamp } from './utils/dateUtils';
|
||||||
|
|
||||||
interface RawMessageData {
|
interface RawMessageData {
|
||||||
id: string;
|
id: string;
|
||||||
@@ -72,6 +79,97 @@ interface RawErrorData {
|
|||||||
stack_trace: string | null;
|
stack_trace: string | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface RawTraceData {
|
||||||
|
trace_id: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
status: string;
|
||||||
|
name: string;
|
||||||
|
bot_id?: string;
|
||||||
|
bot_name?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
pipeline_name?: string;
|
||||||
|
session_id?: string;
|
||||||
|
message_id?: string;
|
||||||
|
query_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface RawSpanData {
|
||||||
|
span_id: string;
|
||||||
|
trace_id: string;
|
||||||
|
parent_span_id?: string;
|
||||||
|
name: string;
|
||||||
|
kind: string;
|
||||||
|
status: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
message_id?: string;
|
||||||
|
session_id?: string;
|
||||||
|
bot_id?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
error_message?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
function mapTrace(raw: RawTraceData) {
|
||||||
|
return {
|
||||||
|
traceId: raw.trace_id,
|
||||||
|
name: raw.name,
|
||||||
|
startedAt: parseUTCTimestamp(raw.started_at),
|
||||||
|
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
|
||||||
|
duration: raw.duration,
|
||||||
|
status: raw.status as 'running' | 'success' | 'error',
|
||||||
|
botId: raw.bot_id,
|
||||||
|
botName: raw.bot_name,
|
||||||
|
pipelineId: raw.pipeline_id,
|
||||||
|
pipelineName: raw.pipeline_name,
|
||||||
|
sessionId: raw.session_id,
|
||||||
|
messageId: raw.message_id,
|
||||||
|
queryId: raw.query_id,
|
||||||
|
attributes: raw.attributes || {},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function mapSpan(raw: RawSpanData): MonitoringSpan {
|
||||||
|
return {
|
||||||
|
spanId: raw.span_id,
|
||||||
|
traceId: raw.trace_id,
|
||||||
|
parentSpanId: raw.parent_span_id,
|
||||||
|
name: raw.name,
|
||||||
|
kind: raw.kind,
|
||||||
|
status: raw.status as 'running' | 'success' | 'error',
|
||||||
|
startedAt: parseUTCTimestamp(raw.started_at),
|
||||||
|
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
|
||||||
|
duration: raw.duration,
|
||||||
|
messageId: raw.message_id,
|
||||||
|
sessionId: raw.session_id,
|
||||||
|
botId: raw.bot_id,
|
||||||
|
pipelineId: raw.pipeline_id,
|
||||||
|
attributes: raw.attributes || {},
|
||||||
|
errorMessage: raw.error_message,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function spanDepth(
|
||||||
|
span: MonitoringSpan,
|
||||||
|
spansById: Map<string, MonitoringSpan>,
|
||||||
|
) {
|
||||||
|
let depth = 0;
|
||||||
|
let current = span.parentSpanId
|
||||||
|
? spansById.get(span.parentSpanId)
|
||||||
|
: undefined;
|
||||||
|
while (current && depth < 8) {
|
||||||
|
depth += 1;
|
||||||
|
current = current.parentSpanId
|
||||||
|
? spansById.get(current.parentSpanId)
|
||||||
|
: undefined;
|
||||||
|
}
|
||||||
|
return depth;
|
||||||
|
}
|
||||||
|
|
||||||
function MonitoringPageContent() {
|
function MonitoringPageContent() {
|
||||||
const { t } = useTranslation();
|
const { t } = useTranslation();
|
||||||
const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } =
|
const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } =
|
||||||
@@ -158,6 +256,13 @@ function MonitoringPageContent() {
|
|||||||
|
|
||||||
// State for expanded errors
|
// State for expanded errors
|
||||||
const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null);
|
const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null);
|
||||||
|
const [expandedTraceId, setExpandedTraceId] = useState<string | null>(null);
|
||||||
|
const [traceDetails, setTraceDetails] = useState<
|
||||||
|
Record<string, TraceDetails>
|
||||||
|
>({});
|
||||||
|
const [loadingTraceDetails, setLoadingTraceDetails] = useState<
|
||||||
|
Record<string, boolean>
|
||||||
|
>({});
|
||||||
|
|
||||||
// State for controlled tabs
|
// State for controlled tabs
|
||||||
const [activeTab, setActiveTab] = useState<string>('messages');
|
const [activeTab, setActiveTab] = useState<string>('messages');
|
||||||
@@ -265,6 +370,34 @@ function MonitoringPageContent() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const toggleTraceExpand = async (traceId: string) => {
|
||||||
|
if (expandedTraceId === traceId) {
|
||||||
|
setExpandedTraceId(null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
setExpandedTraceId(traceId);
|
||||||
|
if (traceDetails[traceId]) return;
|
||||||
|
|
||||||
|
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: true }));
|
||||||
|
try {
|
||||||
|
const result = await backendClient.getMonitoringTraceDetails(traceId);
|
||||||
|
setTraceDetails((prev) => ({
|
||||||
|
...prev,
|
||||||
|
[traceId]: {
|
||||||
|
traceId: result.trace_id,
|
||||||
|
found: result.found,
|
||||||
|
trace: result.trace ? mapTrace(result.trace) : undefined,
|
||||||
|
spans: (result.spans || []).map(mapSpan),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Failed to fetch trace details:', error);
|
||||||
|
} finally {
|
||||||
|
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: false }));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="w-full h-full overflow-y-auto overflow-x-hidden">
|
<div className="w-full h-full overflow-y-auto overflow-x-hidden">
|
||||||
{/* Filters and Refresh Button - Sticky */}
|
{/* Filters and Refresh Button - Sticky */}
|
||||||
@@ -323,6 +456,9 @@ function MonitoringPageContent() {
|
|||||||
<TabsTrigger value="tokens" className="px-6 py-2">
|
<TabsTrigger value="tokens" className="px-6 py-2">
|
||||||
{t('monitoring.tabs.tokens')}
|
{t('monitoring.tabs.tokens')}
|
||||||
</TabsTrigger>
|
</TabsTrigger>
|
||||||
|
<TabsTrigger value="traces" className="px-6 py-2">
|
||||||
|
{t('monitoring.tabs.traces')}
|
||||||
|
</TabsTrigger>
|
||||||
<TabsTrigger value="feedback" className="px-6 py-2">
|
<TabsTrigger value="feedback" className="px-6 py-2">
|
||||||
{t('monitoring.tabs.feedback')}
|
{t('monitoring.tabs.feedback')}
|
||||||
</TabsTrigger>
|
</TabsTrigger>
|
||||||
@@ -690,6 +826,166 @@ function MonitoringPageContent() {
|
|||||||
/>
|
/>
|
||||||
</TabsContent>
|
</TabsContent>
|
||||||
|
|
||||||
|
<TabsContent value="traces" className="p-6 m-0">
|
||||||
|
<div>
|
||||||
|
{loading && (
|
||||||
|
<div className="py-12 flex justify-center">
|
||||||
|
<LoadingSpinner text={t('common.loading')} />
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{!loading && data && data.traces && data.traces.length > 0 && (
|
||||||
|
<div className="space-y-4">
|
||||||
|
{data.traces.map((trace) => {
|
||||||
|
const details = traceDetails[trace.traceId];
|
||||||
|
const spans = details?.spans || [];
|
||||||
|
const spansById = new Map(
|
||||||
|
spans.map((span) => [span.spanId, span]),
|
||||||
|
);
|
||||||
|
const maxDuration = Math.max(
|
||||||
|
1,
|
||||||
|
...spans.map((span) => span.duration || 0),
|
||||||
|
);
|
||||||
|
|
||||||
|
return (
|
||||||
|
<div
|
||||||
|
key={trace.traceId}
|
||||||
|
className="border rounded-xl overflow-hidden transition-all duration-200"
|
||||||
|
>
|
||||||
|
<div
|
||||||
|
className="p-5 cursor-pointer hover:bg-accent transition-colors"
|
||||||
|
onClick={() => toggleTraceExpand(trace.traceId)}
|
||||||
|
>
|
||||||
|
<div className="flex items-start justify-between gap-4">
|
||||||
|
<div className="flex items-start flex-1 min-w-0">
|
||||||
|
<div className="mr-3 mt-0.5">
|
||||||
|
{expandedTraceId === trace.traceId ? (
|
||||||
|
<ChevronDown className="w-5 h-5 text-muted-foreground" />
|
||||||
|
) : (
|
||||||
|
<ChevronRight className="w-5 h-5 text-muted-foreground" />
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
<div className="min-w-0 flex-1">
|
||||||
|
<div className="flex flex-wrap items-center gap-2 mb-2">
|
||||||
|
<span className="text-xs text-muted-foreground font-mono">
|
||||||
|
{trace.traceId}
|
||||||
|
</span>
|
||||||
|
<span
|
||||||
|
className={`text-xs px-2 py-1 rounded ${
|
||||||
|
trace.status === 'error'
|
||||||
|
? 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-200'
|
||||||
|
: trace.status === 'running'
|
||||||
|
? 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-200'
|
||||||
|
: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200'
|
||||||
|
}`}
|
||||||
|
>
|
||||||
|
{trace.status}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
<div className="font-medium text-sm text-foreground mb-1">
|
||||||
|
{trace.name}
|
||||||
|
</div>
|
||||||
|
<div className="text-xs text-muted-foreground truncate">
|
||||||
|
{trace.botName || '-'} →{' '}
|
||||||
|
{trace.pipelineName || '-'}
|
||||||
|
{trace.sessionId
|
||||||
|
? ` · ${trace.sessionId}`
|
||||||
|
: ''}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div className="flex flex-col items-end gap-1 text-xs text-muted-foreground whitespace-nowrap">
|
||||||
|
<span>{trace.startedAt.toLocaleString()}</span>
|
||||||
|
<span>{trace.duration ?? 0}ms</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{expandedTraceId === trace.traceId && (
|
||||||
|
<div className="border-t p-5 bg-muted">
|
||||||
|
{loadingTraceDetails[trace.traceId] && (
|
||||||
|
<div className="py-4 flex justify-center">
|
||||||
|
<LoadingSpinner size="sm" text="" />
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
{!loadingTraceDetails[trace.traceId] && (
|
||||||
|
<div className="space-y-3">
|
||||||
|
{spans.length === 0 && (
|
||||||
|
<div className="text-sm text-muted-foreground">
|
||||||
|
{t('monitoring.traces.noSpans')}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
{spans.map((span) => {
|
||||||
|
const depth = spanDepth(span, spansById);
|
||||||
|
const width = Math.max(
|
||||||
|
6,
|
||||||
|
Math.min(
|
||||||
|
100,
|
||||||
|
((span.duration || 0) / maxDuration) *
|
||||||
|
100,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
return (
|
||||||
|
<div
|
||||||
|
key={span.spanId}
|
||||||
|
className="grid grid-cols-[minmax(180px,1fr)_minmax(140px,2fr)_80px] gap-3 items-center text-xs"
|
||||||
|
>
|
||||||
|
<div
|
||||||
|
className="min-w-0"
|
||||||
|
style={{
|
||||||
|
paddingLeft: `${depth * 16}px`,
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<div className="font-medium text-foreground truncate">
|
||||||
|
{span.name}
|
||||||
|
</div>
|
||||||
|
<div className="text-muted-foreground truncate">
|
||||||
|
{span.kind}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<div className="h-7 bg-background rounded border overflow-hidden">
|
||||||
|
<div
|
||||||
|
className={`h-full ${
|
||||||
|
span.status === 'error'
|
||||||
|
? 'bg-red-500/70'
|
||||||
|
: 'bg-blue-500/70'
|
||||||
|
}`}
|
||||||
|
style={{ width: `${width}%` }}
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
<div className="text-right text-muted-foreground">
|
||||||
|
{span.duration ?? 0}ms
|
||||||
|
</div>
|
||||||
|
{span.errorMessage && (
|
||||||
|
<div className="col-span-3 text-red-600 dark:text-red-400 bg-background rounded p-2">
|
||||||
|
{span.errorMessage}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
})}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
);
|
||||||
|
})}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{!loading &&
|
||||||
|
(!data || !data.traces || data.traces.length === 0) && (
|
||||||
|
<div className="flex flex-col items-center justify-center text-muted-foreground py-16 gap-2">
|
||||||
|
<GitBranch className="h-[3rem] w-[3rem]" />
|
||||||
|
<div className="text-sm">
|
||||||
|
{t('monitoring.traces.noTraces')}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
</TabsContent>
|
||||||
|
|
||||||
<TabsContent value="feedback" className="p-6 m-0">
|
<TabsContent value="feedback" className="p-6 m-0">
|
||||||
<div>
|
<div>
|
||||||
{loading && (
|
{loading && (
|
||||||
|
|||||||
@@ -111,6 +111,48 @@ export interface ErrorLog {
|
|||||||
messageId?: string;
|
messageId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface MonitoringTrace {
|
||||||
|
traceId: string;
|
||||||
|
name: string;
|
||||||
|
startedAt: Date;
|
||||||
|
endedAt?: Date;
|
||||||
|
duration?: number;
|
||||||
|
status: 'running' | 'success' | 'error';
|
||||||
|
botId?: string;
|
||||||
|
botName?: string;
|
||||||
|
pipelineId?: string;
|
||||||
|
pipelineName?: string;
|
||||||
|
sessionId?: string;
|
||||||
|
messageId?: string;
|
||||||
|
queryId?: string;
|
||||||
|
attributes: Record<string, unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface MonitoringSpan {
|
||||||
|
spanId: string;
|
||||||
|
traceId: string;
|
||||||
|
parentSpanId?: string;
|
||||||
|
name: string;
|
||||||
|
kind: string;
|
||||||
|
status: 'success' | 'error' | 'running';
|
||||||
|
startedAt: Date;
|
||||||
|
endedAt?: Date;
|
||||||
|
duration?: number;
|
||||||
|
messageId?: string;
|
||||||
|
sessionId?: string;
|
||||||
|
botId?: string;
|
||||||
|
pipelineId?: string;
|
||||||
|
attributes: Record<string, unknown>;
|
||||||
|
errorMessage?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TraceDetails {
|
||||||
|
traceId: string;
|
||||||
|
found: boolean;
|
||||||
|
trace?: MonitoringTrace;
|
||||||
|
spans: MonitoringSpan[];
|
||||||
|
}
|
||||||
|
|
||||||
export interface MessageDetails {
|
export interface MessageDetails {
|
||||||
messageId: string;
|
messageId: string;
|
||||||
found: boolean;
|
found: boolean;
|
||||||
@@ -125,6 +167,7 @@ export interface MessageDetails {
|
|||||||
averageDurationMs: number;
|
averageDurationMs: number;
|
||||||
};
|
};
|
||||||
errors: ErrorLog[];
|
errors: ErrorLog[];
|
||||||
|
trace?: MonitoringTrace;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface OverviewMetrics {
|
export interface OverviewMetrics {
|
||||||
@@ -203,6 +246,7 @@ export interface MonitoringData {
|
|||||||
modelCalls: ModelCall[];
|
modelCalls: ModelCall[];
|
||||||
sessions: SessionInfo[];
|
sessions: SessionInfo[];
|
||||||
errors: ErrorLog[];
|
errors: ErrorLog[];
|
||||||
|
traces: MonitoringTrace[];
|
||||||
feedback?: FeedbackRecord[];
|
feedback?: FeedbackRecord[];
|
||||||
feedbackStats?: FeedbackStats;
|
feedbackStats?: FeedbackStats;
|
||||||
totalCount: {
|
totalCount: {
|
||||||
@@ -211,6 +255,7 @@ export interface MonitoringData {
|
|||||||
embeddingCalls: number;
|
embeddingCalls: number;
|
||||||
sessions: number;
|
sessions: number;
|
||||||
errors: number;
|
errors: number;
|
||||||
|
traces: number;
|
||||||
feedback?: number;
|
feedback?: number;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1185,12 +1185,29 @@ export class BackendClient extends BaseHttpClient {
|
|||||||
stack_trace?: string;
|
stack_trace?: string;
|
||||||
message_id?: string;
|
message_id?: string;
|
||||||
}>;
|
}>;
|
||||||
|
traces?: Array<{
|
||||||
|
trace_id: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
status: string;
|
||||||
|
name: string;
|
||||||
|
bot_id?: string;
|
||||||
|
bot_name?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
pipeline_name?: string;
|
||||||
|
session_id?: string;
|
||||||
|
message_id?: string;
|
||||||
|
query_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
}>;
|
||||||
totalCount: {
|
totalCount: {
|
||||||
messages: number;
|
messages: number;
|
||||||
llmCalls: number;
|
llmCalls: number;
|
||||||
embeddingCalls: number;
|
embeddingCalls: number;
|
||||||
sessions: number;
|
sessions: number;
|
||||||
errors: number;
|
errors: number;
|
||||||
|
traces?: number;
|
||||||
};
|
};
|
||||||
}> {
|
}> {
|
||||||
const queryParams = new URLSearchParams();
|
const queryParams = new URLSearchParams();
|
||||||
@@ -1213,6 +1230,90 @@ export class BackendClient extends BaseHttpClient {
|
|||||||
return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`);
|
return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public getMonitoringTraces(params: {
|
||||||
|
botId?: string[];
|
||||||
|
pipelineId?: string[];
|
||||||
|
startTime?: string;
|
||||||
|
endTime?: string;
|
||||||
|
limit?: number;
|
||||||
|
}): Promise<{
|
||||||
|
traces: Array<{
|
||||||
|
trace_id: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
status: string;
|
||||||
|
name: string;
|
||||||
|
bot_id?: string;
|
||||||
|
bot_name?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
pipeline_name?: string;
|
||||||
|
session_id?: string;
|
||||||
|
message_id?: string;
|
||||||
|
query_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
}>;
|
||||||
|
total: number;
|
||||||
|
}> {
|
||||||
|
const queryParams = new URLSearchParams();
|
||||||
|
if (params.botId) {
|
||||||
|
params.botId.forEach((id) => queryParams.append('botId', id));
|
||||||
|
}
|
||||||
|
if (params.pipelineId) {
|
||||||
|
params.pipelineId.forEach((id) => queryParams.append('pipelineId', id));
|
||||||
|
}
|
||||||
|
if (params.startTime) {
|
||||||
|
queryParams.append('startTime', params.startTime);
|
||||||
|
}
|
||||||
|
if (params.endTime) {
|
||||||
|
queryParams.append('endTime', params.endTime);
|
||||||
|
}
|
||||||
|
if (params.limit) {
|
||||||
|
queryParams.append('limit', params.limit.toString());
|
||||||
|
}
|
||||||
|
return this.get(`/api/v1/monitoring/traces?${queryParams.toString()}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
public getMonitoringTraceDetails(traceId: string): Promise<{
|
||||||
|
trace_id: string;
|
||||||
|
found: boolean;
|
||||||
|
trace: {
|
||||||
|
trace_id: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
status: string;
|
||||||
|
name: string;
|
||||||
|
bot_id?: string;
|
||||||
|
bot_name?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
pipeline_name?: string;
|
||||||
|
session_id?: string;
|
||||||
|
message_id?: string;
|
||||||
|
query_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
};
|
||||||
|
spans: Array<{
|
||||||
|
span_id: string;
|
||||||
|
trace_id: string;
|
||||||
|
parent_span_id?: string;
|
||||||
|
name: string;
|
||||||
|
kind: string;
|
||||||
|
status: string;
|
||||||
|
started_at: string;
|
||||||
|
ended_at?: string;
|
||||||
|
duration?: number;
|
||||||
|
message_id?: string;
|
||||||
|
session_id?: string;
|
||||||
|
bot_id?: string;
|
||||||
|
pipeline_id?: string;
|
||||||
|
attributes?: Record<string, unknown>;
|
||||||
|
error_message?: string;
|
||||||
|
}>;
|
||||||
|
}> {
|
||||||
|
return this.get(`/api/v1/monitoring/traces/${traceId}`);
|
||||||
|
}
|
||||||
|
|
||||||
public getMonitoringOverview(params: {
|
public getMonitoringOverview(params: {
|
||||||
botId?: string[];
|
botId?: string[];
|
||||||
pipelineId?: string[];
|
pipelineId?: string[];
|
||||||
|
|||||||
@@ -1217,6 +1217,7 @@ const enUS = {
|
|||||||
embeddingCalls: 'Embedding Calls',
|
embeddingCalls: 'Embedding Calls',
|
||||||
modelCalls: 'Model Calls',
|
modelCalls: 'Model Calls',
|
||||||
tokens: 'Token Monitoring',
|
tokens: 'Token Monitoring',
|
||||||
|
traces: 'Traces',
|
||||||
feedback: 'User Feedback',
|
feedback: 'User Feedback',
|
||||||
sessions: 'Session Analysis',
|
sessions: 'Session Analysis',
|
||||||
errors: 'Error Logs',
|
errors: 'Error Logs',
|
||||||
@@ -1321,6 +1322,11 @@ const enUS = {
|
|||||||
noErrors: 'No errors found',
|
noErrors: 'No errors found',
|
||||||
stackTrace: 'Stack Trace',
|
stackTrace: 'Stack Trace',
|
||||||
},
|
},
|
||||||
|
traces: {
|
||||||
|
title: 'Traces',
|
||||||
|
noTraces: 'No traces found',
|
||||||
|
noSpans: 'No spans recorded for this trace',
|
||||||
|
},
|
||||||
feedback: {
|
feedback: {
|
||||||
title: 'User Feedback',
|
title: 'User Feedback',
|
||||||
totalFeedback: 'Total Feedback',
|
totalFeedback: 'Total Feedback',
|
||||||
|
|||||||
@@ -1158,6 +1158,7 @@ const zhHans = {
|
|||||||
embeddingCalls: 'Embedding调用',
|
embeddingCalls: 'Embedding调用',
|
||||||
modelCalls: '模型调用',
|
modelCalls: '模型调用',
|
||||||
tokens: 'Token 监控',
|
tokens: 'Token 监控',
|
||||||
|
traces: '链路追踪',
|
||||||
feedback: '用户反馈',
|
feedback: '用户反馈',
|
||||||
sessions: '会话分析',
|
sessions: '会话分析',
|
||||||
errors: '错误日志',
|
errors: '错误日志',
|
||||||
@@ -1262,6 +1263,11 @@ const zhHans = {
|
|||||||
noErrors: '未找到错误',
|
noErrors: '未找到错误',
|
||||||
stackTrace: '堆栈追踪',
|
stackTrace: '堆栈追踪',
|
||||||
},
|
},
|
||||||
|
traces: {
|
||||||
|
title: '链路追踪',
|
||||||
|
noTraces: '未找到链路记录',
|
||||||
|
noSpans: '此链路暂无 Span 记录',
|
||||||
|
},
|
||||||
feedback: {
|
feedback: {
|
||||||
title: '用户反馈',
|
title: '用户反馈',
|
||||||
totalFeedback: '总反馈数',
|
totalFeedback: '总反馈数',
|
||||||
|
|||||||
Reference in New Issue
Block a user