mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-18 19:44:21 +00:00
feat(monitoring): add host RAG trace observability
This commit is contained in:
@@ -313,18 +313,30 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
offset=0,
|
||||
)
|
||||
|
||||
# Get traces
|
||||
traces, traces_total = await self.ap.monitoring_service.get_traces(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
offset=0,
|
||||
)
|
||||
|
||||
return self.success(
|
||||
data={
|
||||
'overview': overview,
|
||||
'messages': messages,
|
||||
'llmCalls': llm_calls,
|
||||
'embeddingCalls': embedding_calls,
|
||||
'traces': traces,
|
||||
'sessions': sessions,
|
||||
'errors': errors,
|
||||
'totalCount': {
|
||||
'messages': messages_total,
|
||||
'llmCalls': llm_calls_total,
|
||||
'embeddingCalls': embedding_calls_total,
|
||||
'traces': traces_total,
|
||||
'sessions': sessions_total,
|
||||
'errors': errors_total,
|
||||
},
|
||||
@@ -350,6 +362,49 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
|
||||
return self.success(data=details)
|
||||
|
||||
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_traces() -> str:
|
||||
"""Get end-to-end trace records."""
|
||||
bot_ids = quart.request.args.getlist('botId')
|
||||
pipeline_ids = quart.request.args.getlist('pipelineId')
|
||||
session_ids = quart.request.args.getlist('sessionId')
|
||||
statuses = quart.request.args.getlist('status')
|
||||
start_time_str = quart.request.args.get('startTime')
|
||||
end_time_str = quart.request.args.get('endTime')
|
||||
limit = int(quart.request.args.get('limit', 100))
|
||||
offset = int(quart.request.args.get('offset', 0))
|
||||
|
||||
start_time = parse_iso_datetime(start_time_str)
|
||||
end_time = parse_iso_datetime(end_time_str)
|
||||
|
||||
traces, total = await self.ap.monitoring_service.get_traces(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
session_ids=session_ids if session_ids else None,
|
||||
statuses=statuses if statuses else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
return self.success(
|
||||
data={
|
||||
'traces': traces,
|
||||
'total': total,
|
||||
'limit': limit,
|
||||
'offset': offset,
|
||||
}
|
||||
)
|
||||
|
||||
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_trace_details(trace_id: str) -> str:
|
||||
"""Get one trace with all spans."""
|
||||
details = await self.ap.monitoring_service.get_trace_details(trace_id)
|
||||
if not details.get('found'):
|
||||
return self.http_status(404, -1, f'Trace {trace_id} not found')
|
||||
return self.success(data=details)
|
||||
|
||||
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def export_data() -> tuple[str, int]:
|
||||
"""Export monitoring data as CSV"""
|
||||
|
||||
@@ -350,8 +350,24 @@ class PluginsRouterGroup(group.RouterGroup):
|
||||
if not endpoint.startswith('/') or '..' in endpoint:
|
||||
return self.http_status(400, -1, 'invalid endpoint')
|
||||
|
||||
caller = {
|
||||
'plugin_author': author,
|
||||
'plugin_name': plugin_name,
|
||||
'page_id': page_id,
|
||||
'origin': _get_request_origin(),
|
||||
}
|
||||
headers = {
|
||||
key: value
|
||||
for key, value in {
|
||||
'user-agent': quart.request.headers.get('User-Agent'),
|
||||
'x-request-id': quart.request.headers.get('X-Request-ID'),
|
||||
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
|
||||
}.items()
|
||||
if value
|
||||
}
|
||||
|
||||
result = await self.ap.plugin_connector.handle_page_api(
|
||||
author, plugin_name, page_id, endpoint, method.upper(), body
|
||||
author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers
|
||||
)
|
||||
if result.get('error'):
|
||||
return self.http_status(400, -1, result['error'])
|
||||
|
||||
@@ -3,11 +3,53 @@ from __future__ import annotations
|
||||
import uuid
|
||||
import datetime
|
||||
import sqlalchemy
|
||||
import json
|
||||
|
||||
from ....core import app
|
||||
from ....entity.persistence import monitoring as persistence_monitoring
|
||||
|
||||
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
|
||||
def _json_dumps(value: dict | list | None) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
try:
|
||||
return json.dumps(value, ensure_ascii=False, default=str)
|
||||
except Exception:
|
||||
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
|
||||
|
||||
|
||||
def _json_loads(value: str | None) -> dict | list | None:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
return json.loads(value)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def new_trace_id() -> str:
|
||||
return f'trace-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
|
||||
def new_span_id() -> str:
|
||||
return f'span-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
|
||||
def normalize_trace_status(status: str | None) -> str:
|
||||
"""Normalize operation status to the monitoring UI vocabulary."""
|
||||
if status in ('completed', 'ok'):
|
||||
return 'success'
|
||||
if status in ('failed', 'failure', 'exception'):
|
||||
return 'error'
|
||||
if status in ('running', 'success', 'error'):
|
||||
return status
|
||||
return 'success'
|
||||
|
||||
|
||||
class MonitoringService:
|
||||
"""Monitoring service"""
|
||||
|
||||
@@ -74,6 +116,18 @@ class MonitoringService:
|
||||
persistence_monitoring.MonitoringFeedback.timestamp,
|
||||
persistence_monitoring.MonitoringFeedback.id,
|
||||
),
|
||||
(
|
||||
'monitoring_traces',
|
||||
persistence_monitoring.MonitoringTrace,
|
||||
persistence_monitoring.MonitoringTrace.started_at,
|
||||
persistence_monitoring.MonitoringTrace.trace_id,
|
||||
),
|
||||
(
|
||||
'monitoring_spans',
|
||||
persistence_monitoring.MonitoringSpan,
|
||||
persistence_monitoring.MonitoringSpan.started_at,
|
||||
persistence_monitoring.MonitoringSpan.span_id,
|
||||
),
|
||||
]
|
||||
|
||||
deleted_counts: dict[str, int] = {}
|
||||
@@ -133,6 +187,116 @@ class MonitoringService:
|
||||
|
||||
# ========== Recording Methods ==========
|
||||
|
||||
async def start_trace(
|
||||
self,
|
||||
trace_id: str | None = None,
|
||||
name: str = 'LangBot query',
|
||||
bot_id: str | None = None,
|
||||
bot_name: str | None = None,
|
||||
pipeline_id: str | None = None,
|
||||
pipeline_name: str | None = None,
|
||||
session_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
query_id: str | int | None = None,
|
||||
attributes: dict | None = None,
|
||||
) -> str:
|
||||
"""Create or update a trace header row."""
|
||||
trace_id = trace_id or new_trace_id()
|
||||
trace_data = {
|
||||
'trace_id': trace_id,
|
||||
'started_at': _utc_now(),
|
||||
'ended_at': None,
|
||||
'duration': None,
|
||||
'status': 'running',
|
||||
'name': name,
|
||||
'bot_id': bot_id,
|
||||
'bot_name': bot_name,
|
||||
'pipeline_id': pipeline_id,
|
||||
'pipeline_name': pipeline_name,
|
||||
'session_id': session_id,
|
||||
'message_id': message_id,
|
||||
'query_id': str(query_id) if query_id is not None else None,
|
||||
'attributes': _json_dumps(attributes),
|
||||
}
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
|
||||
)
|
||||
return trace_id
|
||||
|
||||
async def finish_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
status: str = 'success',
|
||||
duration: int | None = None,
|
||||
message_id: str | None = None,
|
||||
attributes: dict | None = None,
|
||||
) -> None:
|
||||
"""Mark a trace complete."""
|
||||
update_values: dict = {
|
||||
'ended_at': _utc_now(),
|
||||
'status': normalize_trace_status(status),
|
||||
}
|
||||
if duration is not None:
|
||||
update_values['duration'] = duration
|
||||
if message_id is not None:
|
||||
update_values['message_id'] = message_id
|
||||
if attributes is not None:
|
||||
update_values['attributes'] = _json_dumps(attributes)
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
|
||||
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
|
||||
.values(update_values)
|
||||
)
|
||||
|
||||
async def record_span(
|
||||
self,
|
||||
trace_id: str,
|
||||
name: str,
|
||||
kind: str,
|
||||
status: str = 'success',
|
||||
span_id: str | None = None,
|
||||
parent_span_id: str | None = None,
|
||||
started_at: datetime.datetime | None = None,
|
||||
ended_at: datetime.datetime | None = None,
|
||||
duration: int | None = None,
|
||||
message_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
bot_id: str | None = None,
|
||||
pipeline_id: str | None = None,
|
||||
attributes: dict | None = None,
|
||||
error_message: str | None = None,
|
||||
) -> str:
|
||||
"""Record a single completed span."""
|
||||
started_at = started_at or _utc_now()
|
||||
if duration is None and ended_at is not None:
|
||||
duration = int((ended_at - started_at).total_seconds() * 1000)
|
||||
elif duration is not None:
|
||||
duration = int(round(float(duration)))
|
||||
span_data = {
|
||||
'span_id': span_id or new_span_id(),
|
||||
'trace_id': trace_id,
|
||||
'parent_span_id': parent_span_id,
|
||||
'name': name,
|
||||
'kind': kind,
|
||||
'status': normalize_trace_status(status),
|
||||
'started_at': started_at,
|
||||
'ended_at': ended_at or _utc_now(),
|
||||
'duration': duration,
|
||||
'message_id': message_id,
|
||||
'session_id': session_id,
|
||||
'bot_id': bot_id,
|
||||
'pipeline_id': pipeline_id,
|
||||
'attributes': _json_dumps(attributes),
|
||||
'error_message': error_message,
|
||||
}
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
|
||||
)
|
||||
return span_data['span_id']
|
||||
|
||||
async def record_message(
|
||||
self,
|
||||
bot_id: str,
|
||||
@@ -1076,6 +1240,19 @@ class MonitoringService:
|
||||
for row in error_rows
|
||||
]
|
||||
|
||||
trace_query = (
|
||||
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
|
||||
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||
trace_row = trace_result.first()
|
||||
trace = None
|
||||
if trace_row:
|
||||
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||
trace = self._serialize_trace(trace_model)
|
||||
|
||||
return {
|
||||
'message_id': message_id,
|
||||
'found': True,
|
||||
@@ -1090,6 +1267,90 @@ class MonitoringService:
|
||||
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
|
||||
},
|
||||
'errors': errors,
|
||||
'trace': trace,
|
||||
}
|
||||
|
||||
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
|
||||
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
|
||||
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||
return data
|
||||
|
||||
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
|
||||
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
|
||||
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||
return data
|
||||
|
||||
async def get_traces(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
pipeline_ids: list[str] | None = None,
|
||||
session_ids: list[str] | None = None,
|
||||
statuses: list[str] | None = None,
|
||||
start_time: datetime.datetime | None = None,
|
||||
end_time: datetime.datetime | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> tuple[list[dict], int]:
|
||||
"""Get trace headers with filters."""
|
||||
conditions = []
|
||||
if bot_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
|
||||
if pipeline_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
|
||||
if session_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
|
||||
if statuses:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
|
||||
if start_time:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
|
||||
if end_time:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
|
||||
|
||||
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
|
||||
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||
if conditions:
|
||||
clause = sqlalchemy.and_(*conditions)
|
||||
count_query = count_query.where(clause)
|
||||
query = query.where(clause)
|
||||
|
||||
total_result = await self.ap.persistence_mgr.execute_async(count_query)
|
||||
total = total_result.scalar() or 0
|
||||
|
||||
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
|
||||
result = await self.ap.persistence_mgr.execute_async(query)
|
||||
traces = [
|
||||
self._serialize_trace(row[0] if isinstance(row, tuple) else row)
|
||||
for row in result.all()
|
||||
]
|
||||
return traces, total
|
||||
|
||||
async def get_trace_details(self, trace_id: str) -> dict:
|
||||
"""Get a single trace and all spans in chronological order."""
|
||||
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
|
||||
persistence_monitoring.MonitoringTrace.trace_id == trace_id
|
||||
)
|
||||
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||
trace_row = trace_result.first()
|
||||
if not trace_row:
|
||||
return {'trace_id': trace_id, 'found': False}
|
||||
|
||||
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||
span_query = (
|
||||
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
|
||||
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
|
||||
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
|
||||
)
|
||||
span_result = await self.ap.persistence_mgr.execute_async(span_query)
|
||||
spans = [
|
||||
self._serialize_span(row[0] if isinstance(row, tuple) else row)
|
||||
for row in span_result.all()
|
||||
]
|
||||
|
||||
return {
|
||||
'trace_id': trace_id,
|
||||
'found': True,
|
||||
'trace': self._serialize_trace(trace),
|
||||
'spans': spans,
|
||||
}
|
||||
|
||||
# ========== Export Methods ==========
|
||||
|
||||
@@ -3,6 +3,49 @@ import sqlalchemy
|
||||
from .base import Base
|
||||
|
||||
|
||||
class MonitoringTrace(Base):
|
||||
"""End-to-end monitoring trace records"""
|
||||
|
||||
__tablename__ = 'monitoring_traces'
|
||||
|
||||
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
|
||||
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
|
||||
|
||||
class MonitoringSpan(Base):
|
||||
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
|
||||
|
||||
__tablename__ = 'monitoring_spans'
|
||||
|
||||
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
|
||||
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
|
||||
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
|
||||
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
|
||||
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
|
||||
|
||||
class MonitoringMessage(Base):
|
||||
"""Monitoring message records"""
|
||||
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
"""add monitoring traces and spans
|
||||
|
||||
Revision ID: 0006_monitoring_traces
|
||||
Revises: 0005_add_llm_context_length
|
||||
Create Date: 2026-06-16
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision = '0006_monitoring_traces'
|
||||
down_revision = '0005_add_llm_context_length'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
tables = set(inspector.get_table_names())
|
||||
|
||||
if 'monitoring_traces' not in tables:
|
||||
op.create_table(
|
||||
'monitoring_traces',
|
||||
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('duration', sa.Integer(), nullable=True),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('bot_name', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
|
||||
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('query_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('attributes', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('trace_id'),
|
||||
)
|
||||
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
|
||||
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
|
||||
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
|
||||
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
|
||||
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
|
||||
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
|
||||
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
|
||||
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
|
||||
|
||||
if 'monitoring_spans' not in tables:
|
||||
op.create_table(
|
||||
'monitoring_spans',
|
||||
sa.Column('span_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('kind', sa.String(length=80), nullable=False),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('duration', sa.Integer(), nullable=True),
|
||||
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('attributes', sa.Text(), nullable=True),
|
||||
sa.Column('error_message', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('span_id'),
|
||||
)
|
||||
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
|
||||
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
|
||||
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
|
||||
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
|
||||
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
|
||||
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
|
||||
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
|
||||
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
|
||||
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
tables = set(inspector.get_table_names())
|
||||
if 'monitoring_spans' in tables:
|
||||
op.drop_table('monitoring_spans')
|
||||
if 'monitoring_traces' in tables:
|
||||
op.drop_table('monitoring_traces')
|
||||
@@ -2,6 +2,9 @@ from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import traceback
|
||||
import time
|
||||
import uuid
|
||||
import datetime
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
@@ -79,6 +82,19 @@ class RuntimePipeline:
|
||||
enable_all_plugins: bool
|
||||
"""是否启用所有插件"""
|
||||
|
||||
@staticmethod
|
||||
def _new_span_id() -> str:
|
||||
return f'span-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
@staticmethod
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
@staticmethod
|
||||
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||
return f'{launcher_type}_{query.launcher_id}'
|
||||
|
||||
enable_all_mcp_servers: bool
|
||||
"""是否启用所有MCP服务器"""
|
||||
|
||||
@@ -234,44 +250,92 @@ class RuntimePipeline:
|
||||
stage_container = self.stage_containers[i]
|
||||
|
||||
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
|
||||
span_started_at = self._utc_now()
|
||||
span_started = time.perf_counter()
|
||||
span_status = 'success'
|
||||
span_error = None
|
||||
span_result_type = None
|
||||
|
||||
result = stage_container.inst.process(query, stage_container.inst_name)
|
||||
try:
|
||||
result = stage_container.inst.process(query, stage_container.inst_name)
|
||||
|
||||
if isinstance(result, typing.Coroutine):
|
||||
result = await result
|
||||
if isinstance(result, typing.Coroutine):
|
||||
result = await result
|
||||
|
||||
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
||||
)
|
||||
await self._check_output(query, result)
|
||||
|
||||
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||
break
|
||||
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = result.new_query
|
||||
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
||||
|
||||
async for sub_result in result:
|
||||
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
||||
span_result_type = str(result.result_type.value if hasattr(result.result_type, 'value') else result.result_type)
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
||||
)
|
||||
await self._check_output(query, sub_result)
|
||||
await self._check_output(query, result)
|
||||
|
||||
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||
break
|
||||
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = sub_result.new_query
|
||||
await self._execute_from_stage(i + 1, query)
|
||||
break
|
||||
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = result.new_query
|
||||
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
||||
span_result_type = 'generator'
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
||||
|
||||
async for sub_result in result:
|
||||
span_result_type = str(
|
||||
sub_result.result_type.value
|
||||
if hasattr(sub_result.result_type, 'value')
|
||||
else sub_result.result_type
|
||||
)
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
||||
)
|
||||
await self._check_output(query, sub_result)
|
||||
|
||||
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||
break
|
||||
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = sub_result.new_query
|
||||
await self._execute_from_stage(i + 1, query)
|
||||
break
|
||||
except Exception as e:
|
||||
span_status = 'error'
|
||||
span_error = str(e)
|
||||
raise
|
||||
finally:
|
||||
trace_id = (query.variables or {}).get('_monitoring_trace_id')
|
||||
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
|
||||
if trace_id:
|
||||
try:
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=root_span_id,
|
||||
name=stage_container.inst_name,
|
||||
kind='pipeline.stage',
|
||||
status=span_status,
|
||||
started_at=span_started_at,
|
||||
duration=int((time.perf_counter() - span_started) * 1000),
|
||||
message_id=(query.variables or {}).get('_monitoring_message_id'),
|
||||
session_id=self._query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
attributes={
|
||||
'stage_class': stage_container.inst.__class__.__name__,
|
||||
'result_type': span_result_type,
|
||||
'query_id': query.query_id,
|
||||
},
|
||||
error_message=span_error,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
|
||||
|
||||
i += 1
|
||||
|
||||
async def process_query(self, query: pipeline_query.Query):
|
||||
"""处理请求"""
|
||||
trace_started_at = self._utc_now()
|
||||
trace_started = time.perf_counter()
|
||||
root_span_id = self._new_span_id()
|
||||
trace_id = None
|
||||
trace_status = 'success'
|
||||
# Get monitoring metadata
|
||||
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
|
||||
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
|
||||
@@ -303,6 +367,28 @@ class RuntimePipeline:
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to record query start: {e}')
|
||||
|
||||
try:
|
||||
trace_id = await self.ap.monitoring_service.start_trace(
|
||||
name='LangBot query',
|
||||
bot_id=query.bot_uuid or 'unknown',
|
||||
bot_name=bot_name,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
pipeline_name=pipeline_name,
|
||||
session_id=self._query_session_id(query),
|
||||
message_id=message_id or None,
|
||||
query_id=query.query_id,
|
||||
attributes={
|
||||
'launcher_type': query.launcher_type.value
|
||||
if hasattr(query.launcher_type, 'value')
|
||||
else str(query.launcher_type),
|
||||
'runner_name': runner_name,
|
||||
},
|
||||
)
|
||||
query.variables['_monitoring_trace_id'] = trace_id
|
||||
query.variables['_monitoring_root_span_id'] = root_span_id
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to start query trace: {e}')
|
||||
|
||||
try:
|
||||
# Get bound plugins for this pipeline
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||
@@ -361,6 +447,7 @@ class RuntimePipeline:
|
||||
self.ap.logger.error(f'Failed to record query response: {e}')
|
||||
|
||||
except Exception as e:
|
||||
trace_status = 'error'
|
||||
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
|
||||
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
|
||||
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
||||
@@ -383,6 +470,35 @@ class RuntimePipeline:
|
||||
self.ap.logger.error(f'Failed to record query error: {me}')
|
||||
|
||||
finally:
|
||||
if trace_id:
|
||||
try:
|
||||
duration_ms = int((time.perf_counter() - trace_started) * 1000)
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=root_span_id,
|
||||
name='LangBot query',
|
||||
kind='pipeline.query',
|
||||
status=trace_status,
|
||||
started_at=trace_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id or None,
|
||||
session_id=self._query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
attributes={
|
||||
'query_id': query.query_id,
|
||||
'pipeline_name': pipeline_name,
|
||||
'runner_name': runner_name,
|
||||
},
|
||||
)
|
||||
await self.ap.monitoring_service.finish_trace(
|
||||
trace_id=trace_id,
|
||||
status=trace_status,
|
||||
duration=duration_ms,
|
||||
message_id=message_id or None,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
|
||||
self.ap.logger.debug(f'Query {query.query_id} processed')
|
||||
del self.ap.query_pool.cached_queries[query.query_id]
|
||||
|
||||
|
||||
@@ -711,8 +711,19 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
|
||||
endpoint: str,
|
||||
method: str,
|
||||
body: Any = None,
|
||||
caller: dict[str, Any] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body)
|
||||
return await self.handler.handle_page_api(
|
||||
plugin_author,
|
||||
plugin_name,
|
||||
page_id,
|
||||
endpoint,
|
||||
method,
|
||||
body,
|
||||
caller,
|
||||
headers or {},
|
||||
)
|
||||
|
||||
async def get_debug_info(self) -> dict[str, Any]:
|
||||
"""Get debug information including debug key and WS URL"""
|
||||
|
||||
@@ -755,6 +755,19 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
'session_name': session_name,
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
'_trace_context': {
|
||||
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||
'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None,
|
||||
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||
'query_id': query.query_id,
|
||||
'session_id': session_name,
|
||||
'bot_id': query.bot_uuid or '',
|
||||
'pipeline_id': query.pipeline_uuid or '',
|
||||
'knowledge_base_id': kb_id,
|
||||
'attributes': {
|
||||
'source': 'plugin-api',
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
results = [entry.model_dump(mode='json') for entry in entries]
|
||||
@@ -1011,6 +1024,8 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
endpoint: str,
|
||||
method: str,
|
||||
body: Any = None,
|
||||
caller: dict[str, Any] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Forward a page API call to the plugin via runtime."""
|
||||
result = await self.call_action(
|
||||
@@ -1022,6 +1037,8 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
'endpoint': endpoint,
|
||||
'method': method,
|
||||
'body': body,
|
||||
'caller': caller,
|
||||
'headers': headers or {},
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import abc
|
||||
import typing
|
||||
import time
|
||||
import datetime
|
||||
|
||||
from ...core import app
|
||||
from ...entity.persistence import model as persistence_model
|
||||
@@ -16,6 +17,15 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
|
||||
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
|
||||
|
||||
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
|
||||
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||
return f'{launcher_type}_{query.launcher_id}'
|
||||
|
||||
|
||||
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
|
||||
"""Store the latest provider usage on the query for upstream action handlers."""
|
||||
if query is None or not usage_info:
|
||||
@@ -59,6 +69,7 @@ class RuntimeProvider:
|
||||
"""Bridge method for invoking LLM with monitoring"""
|
||||
# Start timing for monitoring
|
||||
start_time = time.time()
|
||||
span_started_at = _utc_now()
|
||||
input_tokens = 0
|
||||
output_tokens = 0
|
||||
status = 'success'
|
||||
@@ -125,6 +136,30 @@ class RuntimeProvider:
|
||||
error_message=error_message,
|
||||
message_id=message_id,
|
||||
)
|
||||
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||
if trace_id:
|
||||
await self.requester.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name=f'LLM {model.model_entity.name}',
|
||||
kind='model.llm',
|
||||
status=status,
|
||||
started_at=span_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id,
|
||||
session_id=_query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=query.pipeline_uuid,
|
||||
attributes={
|
||||
'model_name': model.model_entity.name,
|
||||
'input_tokens': input_tokens,
|
||||
'output_tokens': output_tokens,
|
||||
'total_tokens': input_tokens + output_tokens,
|
||||
'stream': False,
|
||||
},
|
||||
error_message=error_message,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
|
||||
|
||||
@@ -140,6 +175,7 @@ class RuntimeProvider:
|
||||
"""Bridge method for invoking LLM stream with monitoring"""
|
||||
# Start timing for monitoring
|
||||
start_time = time.time()
|
||||
span_started_at = _utc_now()
|
||||
status = 'success'
|
||||
error_message = None
|
||||
input_tokens = 0
|
||||
@@ -204,6 +240,30 @@ class RuntimeProvider:
|
||||
error_message=error_message,
|
||||
message_id=message_id,
|
||||
)
|
||||
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||
if trace_id:
|
||||
await self.requester.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name=f'LLM stream {model.model_entity.name}',
|
||||
kind='model.llm',
|
||||
status=status,
|
||||
started_at=span_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id,
|
||||
session_id=_query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=query.pipeline_uuid,
|
||||
attributes={
|
||||
'model_name': model.model_entity.name,
|
||||
'input_tokens': input_tokens,
|
||||
'output_tokens': output_tokens,
|
||||
'total_tokens': input_tokens + output_tokens,
|
||||
'stream': True,
|
||||
},
|
||||
error_message=error_message,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
|
||||
|
||||
|
||||
@@ -268,6 +268,19 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
'_trace_context': {
|
||||
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||
'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None,
|
||||
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||
'query_id': query.query_id,
|
||||
'session_id': f'{query.launcher_type.value}_{query.launcher_id}',
|
||||
'bot_id': query.bot_uuid or '',
|
||||
'pipeline_id': query.pipeline_uuid or '',
|
||||
'knowledge_base_id': kb_uuid,
|
||||
'attributes': {
|
||||
'source': 'local-agent',
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import traceback
|
||||
import uuid
|
||||
import zipfile
|
||||
import io
|
||||
import datetime
|
||||
from typing import Any
|
||||
from langbot.pkg.core import app
|
||||
import sqlalchemy
|
||||
@@ -25,6 +26,10 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
super().__init__(ap)
|
||||
self.knowledge_base_entity = knowledge_base_entity
|
||||
|
||||
@staticmethod
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
async def initialize(self):
|
||||
pass
|
||||
|
||||
@@ -334,6 +339,24 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
# are passed directly to vector_search by some plugins (e.g. LangRAG)
|
||||
# and would cause empty results when the metadata field doesn't exist.
|
||||
filters = settings.pop('filters', {})
|
||||
trace_context = settings.pop('_trace_context', None)
|
||||
host_span_started_at = self._utc_now()
|
||||
host_span_id = None
|
||||
if trace_context and trace_context.get('trace_id'):
|
||||
host_parent_span_id = trace_context.get('parent_span_id')
|
||||
host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}'
|
||||
trace_context = {
|
||||
'trace_id': trace_context.get('trace_id'),
|
||||
'parent_span_id': host_span_id,
|
||||
'host_parent_span_id': host_parent_span_id,
|
||||
'message_id': trace_context.get('message_id'),
|
||||
'query_id': trace_context.get('query_id'),
|
||||
'session_id': trace_context.get('session_id'),
|
||||
'bot_id': trace_context.get('bot_id'),
|
||||
'pipeline_id': trace_context.get('pipeline_id'),
|
||||
'knowledge_base_id': kb.uuid,
|
||||
'attributes': trace_context.get('attributes') or {},
|
||||
}
|
||||
|
||||
retrieval_context = {
|
||||
'query': query,
|
||||
@@ -343,13 +366,104 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
'creation_settings': kb.creation_settings or {},
|
||||
'filters': filters,
|
||||
}
|
||||
if trace_context:
|
||||
retrieval_context['trace_context'] = trace_context
|
||||
|
||||
result = await self.ap.plugin_connector.call_rag_retrieve(
|
||||
plugin_id,
|
||||
retrieval_context,
|
||||
)
|
||||
try:
|
||||
result = await self.ap.plugin_connector.call_rag_retrieve(
|
||||
plugin_id,
|
||||
retrieval_context,
|
||||
)
|
||||
except Exception as e:
|
||||
if trace_context:
|
||||
await self._record_rag_trace_result(
|
||||
trace_context=trace_context,
|
||||
host_span_id=host_span_id,
|
||||
started_at=host_span_started_at,
|
||||
plugin_id=plugin_id,
|
||||
result={
|
||||
'results': [],
|
||||
'metadata': {
|
||||
'status': 'error',
|
||||
'error_message': str(e),
|
||||
},
|
||||
},
|
||||
)
|
||||
raise
|
||||
if trace_context:
|
||||
await self._record_rag_trace_result(
|
||||
trace_context=trace_context,
|
||||
host_span_id=host_span_id,
|
||||
started_at=host_span_started_at,
|
||||
plugin_id=plugin_id,
|
||||
result=result,
|
||||
)
|
||||
return result
|
||||
|
||||
async def _record_rag_trace_result(
|
||||
self,
|
||||
trace_context: dict[str, Any],
|
||||
host_span_id: str | None,
|
||||
started_at: datetime.datetime,
|
||||
plugin_id: str,
|
||||
result: dict[str, Any],
|
||||
) -> None:
|
||||
"""Persist host RAG span and plugin-provided child spans."""
|
||||
trace_id = trace_context.get('trace_id')
|
||||
if not trace_id:
|
||||
return
|
||||
|
||||
metadata = result.get('metadata') if isinstance(result, dict) else {}
|
||||
metadata = metadata if isinstance(metadata, dict) else {}
|
||||
plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else []
|
||||
parent_span_id = trace_context.get('parent_span_id')
|
||||
host_parent_span_id = trace_context.get('host_parent_span_id')
|
||||
|
||||
try:
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=host_span_id,
|
||||
parent_span_id=host_parent_span_id,
|
||||
name=f'Knowledge retrieval {self.knowledge_base_entity.name}',
|
||||
kind='rag.retrieval',
|
||||
status=metadata.get('status', 'success'),
|
||||
started_at=started_at,
|
||||
duration=metadata.get('duration_ms'),
|
||||
message_id=trace_context.get('message_id'),
|
||||
session_id=trace_context.get('session_id'),
|
||||
bot_id=trace_context.get('bot_id'),
|
||||
pipeline_id=trace_context.get('pipeline_id'),
|
||||
attributes={
|
||||
'knowledge_base_id': self.knowledge_base_entity.uuid,
|
||||
'knowledge_base_name': self.knowledge_base_entity.name,
|
||||
'plugin_id': plugin_id,
|
||||
'returned_count': len(result.get('results', []) if isinstance(result, dict) else []),
|
||||
'total_found': result.get('total_found') if isinstance(result, dict) else None,
|
||||
},
|
||||
error_message=metadata.get('error_message'),
|
||||
)
|
||||
for span in plugin_spans:
|
||||
if not isinstance(span, dict):
|
||||
continue
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=span.get('span_id'),
|
||||
parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id,
|
||||
name=span.get('name') or 'RAG plugin stage',
|
||||
kind=span.get('kind') or 'rag.stage',
|
||||
status=span.get('status') or 'success',
|
||||
started_at=started_at,
|
||||
duration=span.get('duration_ms'),
|
||||
message_id=trace_context.get('message_id'),
|
||||
session_id=trace_context.get('session_id'),
|
||||
bot_id=trace_context.get('bot_id'),
|
||||
pipeline_id=trace_context.get('pipeline_id'),
|
||||
attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {},
|
||||
error_message=span.get('error_message'),
|
||||
)
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to record RAG trace spans: {e}')
|
||||
|
||||
async def _delete_document(self, document_id: str) -> bool:
|
||||
"""Call plugin to delete document."""
|
||||
kb = self.knowledge_base_entity
|
||||
|
||||
Reference in New Issue
Block a user