Feat/monitor (#1928)

* feat: add monitor

* feat: fix tab

* feat: work

* feat: not reliable monitor

* feat: enhance monitoring page layout with integrated filters and refresh button

* feat: add support for runner recording

* feat: add jump button & alignment

* feat: new

* fix: not show query variables in local agent

* fix: pnpm lint and python ruff check

* fix: ruff fromat

* chore: remove unnecessary migration

* style: optimize monitoring page layout and fix sticky filter issues

- Enhanced metric cards with gradient backgrounds and hover effects
- Increased traffic chart height from 200px to 300px
- Adjusted grid layout and spacing for better visual appeal
- Fixed sticky filter area to properly cover parent padding without transparent gaps
- Used negative margins and positioning to eliminate scrolling artifacts
- Matched padding/margins with other pages (pipelines, bots) for consistency
- Removed duplicate title/subtitle from page content
- Added cursor-pointer styling to tab triggers
- Removed border between tab list and tab content

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix: apply prettier formatting to monitoring components

- Fixed indentation and spacing in MetricCard.tsx
- Fixed formatting in TrafficChart.tsx
- Applied prettier formatting to page.tsx

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* feat: update HomeSidebar to trigger action on child selection and localize monitoring titles

* refactor: streamline LLM and embedding invocation methods

* feat: add embedding model monitor

* fix: database version

* chore: simplify pnpm-lock.yaml formatting

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>
Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Guanchao Wang
2026-01-26 21:08:23 +08:00
committed by GitHub
parent b73847f1a6
commit 5d9f6ec763
37 changed files with 6706 additions and 3182 deletions

View File

@@ -0,0 +1,325 @@
from __future__ import annotations
import datetime
import quart
from .. import group
def parse_iso_datetime(datetime_str: str | None) -> datetime.datetime | None:
"""Parse ISO 8601 datetime string, handling 'Z' suffix for UTC timezone"""
if not datetime_str:
return None
# Replace 'Z' with '+00:00' for Python 3.10 compatibility
if datetime_str.endswith('Z'):
datetime_str = datetime_str[:-1] + '+00:00'
dt = datetime.datetime.fromisoformat(datetime_str)
# Convert to UTC and remove timezone info to match database storage (which stores UTC as naive datetime)
if dt.tzinfo is not None:
# Convert to UTC and remove timezone info
dt = dt.astimezone(datetime.timezone.utc).replace(tzinfo=None)
return dt
@group.group_class('monitoring', '/api/v1/monitoring')
class MonitoringRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('/overview', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_overview() -> str:
"""Get overview metrics"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
metrics = await self.ap.monitoring_service.get_overview_metrics(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
)
return self.success(data=metrics)
@self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_messages() -> str:
"""Get message logs"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
messages, total = await self.ap.monitoring_service.get_messages(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'messages': messages,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/llm-calls', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_llm_calls() -> str:
"""Get LLM call records"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
llm_calls, total = await self.ap.monitoring_service.get_llm_calls(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'llm_calls': llm_calls,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/embedding-calls', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_embedding_calls() -> str:
"""Get embedding call records"""
# Parse query parameters
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
knowledge_base_id = quart.request.args.get('knowledgeBaseId')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
embedding_calls, total = await self.ap.monitoring_service.get_embedding_calls(
start_time=start_time,
end_time=end_time,
knowledge_base_id=knowledge_base_id if knowledge_base_id else None,
limit=limit,
offset=offset,
)
return self.success(
data={
'embedding_calls': embedding_calls,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/sessions', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_sessions() -> str:
"""Get session information"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
is_active_str = quart.request.args.get('isActive')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
# Parse is_active
is_active = None
if is_active_str:
is_active = is_active_str.lower() == 'true'
sessions, total = await self.ap.monitoring_service.get_sessions(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
is_active=is_active,
limit=limit,
offset=offset,
)
return self.success(
data={
'sessions': sessions,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/errors', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_errors() -> str:
"""Get error logs"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
errors, total = await self.ap.monitoring_service.get_errors(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'errors': errors,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/data', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_all_data() -> str:
"""Get all monitoring data in a single request"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 50))
# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
# Get overview metrics
overview = await self.ap.monitoring_service.get_overview_metrics(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
)
# Get messages
messages, messages_total = await self.ap.monitoring_service.get_messages(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
# Get LLM calls
llm_calls, llm_calls_total = await self.ap.monitoring_service.get_llm_calls(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
# Get sessions
sessions, sessions_total = await self.ap.monitoring_service.get_sessions(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
is_active=None,
limit=limit,
offset=0,
)
# Get errors
errors, errors_total = await self.ap.monitoring_service.get_errors(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
# Get embedding calls
embedding_calls, embedding_calls_total = await self.ap.monitoring_service.get_embedding_calls(
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
return self.success(
data={
'overview': overview,
'messages': messages,
'llmCalls': llm_calls,
'embeddingCalls': embedding_calls,
'sessions': sessions,
'errors': errors,
'totalCount': {
'messages': messages_total,
'llmCalls': llm_calls_total,
'embeddingCalls': embedding_calls_total,
'sessions': sessions_total,
'errors': errors_total,
},
}
)
@self.route('/sessions/<session_id>/analysis', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_session_analysis(session_id: str) -> str:
"""Get detailed analysis for a specific session"""
analysis = await self.ap.monitoring_service.get_session_analysis(session_id)
# Always return success with the analysis data
# The frontend will handle the 'found: false' case
return self.success(data=analysis)
@self.route('/messages/<message_id>/details', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_message_details(message_id: str) -> str:
"""Get detailed information for a specific message"""
details = await self.ap.monitoring_service.get_message_details(message_id)
if not details.get('found'):
return self.error(message=f'Message {message_id} not found', code=404)
return self.success(data=details)

View File

@@ -192,7 +192,7 @@ class LLMModelsService:
runtime_llm_model = await self.ap.model_mgr.init_temporary_runtime_llm_model(model_data)
extra_args = model_data.get('extra_args', {})
await runtime_llm_model.provider.requester.invoke_llm(
await runtime_llm_model.provider.invoke_llm(
query=None,
model=runtime_llm_model,
messages=[provider_message.Message(role='user', content='Hello, world! Please just reply a "Hello".')],
@@ -354,7 +354,7 @@ class EmbeddingModelsService:
else:
runtime_embedding_model = await self.ap.model_mgr.init_temporary_runtime_embedding_model(model_data)
await runtime_embedding_model.provider.requester.invoke_embedding(
await runtime_embedding_model.provider.invoke_embedding(
model=runtime_embedding_model,
input_text=['Hello, world!'],
extra_args={},

View File

@@ -0,0 +1,796 @@
from __future__ import annotations
import uuid
import datetime
import sqlalchemy
from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring
class MonitoringService:
"""Monitoring service"""
ap: app.Application
def __init__(self, ap: app.Application) -> None:
self.ap = ap
# ========== Recording Methods ==========
async def record_message(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
message_content: str,
session_id: str,
status: str = 'success',
level: str = 'info',
platform: str | None = None,
user_id: str | None = None,
runner_name: str | None = None,
variables: str | None = None,
) -> str:
"""Record a message"""
message_id = str(uuid.uuid4())
message_data = {
'id': message_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'message_content': message_content,
'session_id': session_id,
'status': status,
'level': level,
'platform': platform,
'user_id': user_id,
'runner_name': runner_name,
'variables': variables,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringMessage).values(message_data)
)
return message_id
async def record_llm_call(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
session_id: str,
model_name: str,
input_tokens: int,
output_tokens: int,
duration: int,
status: str = 'success',
cost: float | None = None,
error_message: str | None = None,
message_id: str | None = None,
) -> str:
"""Record an LLM call"""
call_id = str(uuid.uuid4())
call_data = {
'id': call_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'model_name': model_name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'duration': duration,
'cost': cost,
'status': status,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'error_message': error_message,
'message_id': message_id,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringLLMCall).values(call_data)
)
return call_id
async def record_embedding_call(
self,
model_name: str,
prompt_tokens: int,
total_tokens: int,
duration: int,
input_count: int,
status: str = 'success',
error_message: str | None = None,
knowledge_base_id: str | None = None,
query_text: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
call_type: str | None = None,
) -> str:
"""Record an embedding call"""
call_id = str(uuid.uuid4())
call_data = {
'id': call_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'model_name': model_name,
'prompt_tokens': prompt_tokens,
'total_tokens': total_tokens,
'duration': duration,
'input_count': input_count,
'status': status,
'error_message': error_message,
'knowledge_base_id': knowledge_base_id,
'query_text': query_text,
'session_id': session_id,
'message_id': message_id,
'call_type': call_type,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringEmbeddingCall).values(call_data)
)
return call_id
async def record_session_start(
self,
session_id: str,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
platform: str | None = None,
user_id: str | None = None,
) -> None:
"""Record a new session"""
session_data = {
'session_id': session_id,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'message_count': 0,
'start_time': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'is_active': True,
'platform': platform,
'user_id': user_id,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSession).values(session_data)
)
async def update_session_activity(
self,
session_id: str,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
) -> bool:
"""Update session last activity time and increment message count.
Also updates pipeline info if the bot's pipeline has changed.
Returns:
True if session was found and updated, False if session doesn't exist.
"""
update_values = {
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'message_count': persistence_monitoring.MonitoringSession.message_count + 1,
}
# Update pipeline info if provided (handles pipeline switch)
if pipeline_id is not None:
update_values['pipeline_id'] = pipeline_id
if pipeline_name is not None:
update_values['pipeline_name'] = pipeline_name
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringSession)
.where(persistence_monitoring.MonitoringSession.session_id == session_id)
.values(update_values)
)
# Check if any rows were updated
return result.rowcount > 0
async def record_error(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
error_type: str,
error_message: str,
session_id: str | None = None,
stack_trace: str | None = None,
message_id: str | None = None,
) -> str:
"""Record an error"""
error_id = str(uuid.uuid4())
error_data = {
'id': error_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'error_type': error_type,
'error_message': error_message,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'stack_trace': stack_trace,
'message_id': message_id,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringError).values(error_data)
)
return error_id
async def update_message_status(
self,
message_id: str,
status: str,
level: str | None = None,
variables: str | None = None,
) -> None:
"""Update message status and optionally variables"""
update_values = {'status': status}
if level is not None:
update_values['level'] = level
if variables is not None:
update_values['variables'] = variables
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringMessage)
.where(persistence_monitoring.MonitoringMessage.id == message_id)
.values(update_values)
)
# ========== Query Methods ==========
async def get_overview_metrics(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
) -> dict:
"""Get overview metrics"""
# Build base query conditions
message_conditions = []
llm_conditions = []
embedding_conditions = []
session_conditions = []
if bot_ids:
message_conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
session_conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
if pipeline_ids:
message_conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
session_conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
if start_time:
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
session_conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
if end_time:
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
session_conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
# Total messages
message_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
if message_conditions:
message_query = message_query.where(sqlalchemy.and_(*message_conditions))
total_messages_result = await self.ap.persistence_mgr.execute_async(message_query)
total_messages = total_messages_result.scalar() or 0
# Total LLM calls
llm_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
if llm_conditions:
llm_query = llm_query.where(sqlalchemy.and_(*llm_conditions))
llm_calls_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_calls = llm_calls_result.scalar() or 0
# Total Embedding calls
embedding_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
if embedding_conditions:
embedding_query = embedding_query.where(sqlalchemy.and_(*embedding_conditions))
embedding_calls_result = await self.ap.persistence_mgr.execute_async(embedding_query)
embedding_calls = embedding_calls_result.scalar() or 0
# Total model calls (LLM + Embedding)
model_calls = llm_calls + embedding_calls
# Success rate (based on messages)
success_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id)).where(
persistence_monitoring.MonitoringMessage.status == 'success'
)
if message_conditions:
success_query = success_query.where(sqlalchemy.and_(*message_conditions))
success_result = await self.ap.persistence_mgr.execute_async(success_query)
success_count = success_result.scalar() or 0
success_rate = (success_count / total_messages * 100) if total_messages > 0 else 100
# Active sessions
active_session_query = sqlalchemy.select(
sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id)
).where(persistence_monitoring.MonitoringSession.is_active == True)
if session_conditions:
active_session_query = active_session_query.where(sqlalchemy.and_(*session_conditions))
active_sessions_result = await self.ap.persistence_mgr.execute_async(active_session_query)
active_sessions = active_sessions_result.scalar() or 0
return {
'total_messages': total_messages,
'llm_calls': llm_calls,
'embedding_calls': embedding_calls,
'model_calls': model_calls,
'success_rate': round(success_rate, 2),
'active_sessions': active_sessions,
}
async def get_messages(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get messages with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get messages
query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).order_by(
persistence_monitoring.MonitoringMessage.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
messages_rows = result.all()
serialized = []
for row in messages_rows:
# Extract model instance from Row (SQLAlchemy returns Row objects)
msg = row[0] if isinstance(row, tuple) else row
serialized_msg = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, msg)
serialized.append(serialized_msg)
return (serialized, total)
async def get_llm_calls(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get LLM calls with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get LLM calls
query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).order_by(
persistence_monitoring.MonitoringLLMCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
llm_calls_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
)
for row in llm_calls_rows
],
total,
)
async def get_embedding_calls(
self,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
knowledge_base_id: str | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get embedding calls with filters"""
conditions = []
if start_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
if knowledge_base_id:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.knowledge_base_id == knowledge_base_id)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get embedding calls
query = sqlalchemy.select(persistence_monitoring.MonitoringEmbeddingCall).order_by(
persistence_monitoring.MonitoringEmbeddingCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
embedding_calls_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringEmbeddingCall, row[0] if isinstance(row, tuple) else row
)
for row in embedding_calls_rows
],
total,
)
async def get_sessions(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
is_active: bool | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get sessions with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
if is_active is not None:
conditions.append(persistence_monitoring.MonitoringSession.is_active == is_active)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get sessions
query = sqlalchemy.select(persistence_monitoring.MonitoringSession).order_by(
persistence_monitoring.MonitoringSession.last_activity.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
sessions_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringSession, row[0] if isinstance(row, tuple) else row
)
for row in sessions_rows
],
total,
)
async def get_errors(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get errors with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringError.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringError.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringError.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get errors
query = sqlalchemy.select(persistence_monitoring.MonitoringError).order_by(
persistence_monitoring.MonitoringError.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
errors_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in errors_rows
],
total,
)
async def get_session_analysis(
self,
session_id: str,
) -> dict:
"""Get detailed analysis for a specific session"""
# Get session info
session_query = sqlalchemy.select(persistence_monitoring.MonitoringSession).where(
persistence_monitoring.MonitoringSession.session_id == session_id
)
session_result = await self.ap.persistence_mgr.execute_async(session_query)
session_row = session_result.first()
if not session_row:
return {
'session_id': session_id,
'found': False,
}
session = session_row[0] if isinstance(session_row, tuple) else session_row
# Get messages for this session
messages_query = (
sqlalchemy.select(persistence_monitoring.MonitoringMessage)
.where(persistence_monitoring.MonitoringMessage.session_id == session_id)
.order_by(persistence_monitoring.MonitoringMessage.timestamp.asc())
)
messages_result = await self.ap.persistence_mgr.execute_async(messages_query)
messages_rows = messages_result.all()
# Count messages by status
success_messages = 0
error_messages = 0
pending_messages = 0
for row in messages_rows:
msg = row[0] if isinstance(row, tuple) else row
if msg.status == 'success':
success_messages += 1
elif msg.status == 'error':
error_messages += 1
elif msg.status == 'pending':
pending_messages += 1
# Get LLM calls for this session
llm_query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).where(
persistence_monitoring.MonitoringLLMCall.session_id == session_id
)
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_rows = llm_result.all()
# Calculate LLM statistics
total_llm_calls = len(llm_rows)
total_input_tokens = 0
total_output_tokens = 0
total_tokens = 0
total_duration = 0
success_llm_calls = 0
error_llm_calls = 0
for row in llm_rows:
llm_call = row[0] if isinstance(row, tuple) else row
total_input_tokens += llm_call.input_tokens
total_output_tokens += llm_call.output_tokens
total_tokens += llm_call.total_tokens
total_duration += llm_call.duration
if llm_call.status == 'success':
success_llm_calls += 1
else:
error_llm_calls += 1
# Get errors for this session
error_query = (
sqlalchemy.select(persistence_monitoring.MonitoringError)
.where(persistence_monitoring.MonitoringError.session_id == session_id)
.order_by(persistence_monitoring.MonitoringError.timestamp.desc())
)
error_result = await self.ap.persistence_mgr.execute_async(error_query)
error_rows = error_result.all()
errors = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in error_rows
]
# Calculate session duration
if messages_rows:
first_msg = messages_rows[0][0] if isinstance(messages_rows[0], tuple) else messages_rows[0]
last_msg = messages_rows[-1][0] if isinstance(messages_rows[-1], tuple) else messages_rows[-1]
session_duration_seconds = int((last_msg.timestamp - first_msg.timestamp).total_seconds())
else:
session_duration_seconds = 0
return {
'session_id': session_id,
'found': True,
'session': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSession, session),
'message_stats': {
'total': len(messages_rows),
'success': success_messages,
'error': error_messages,
'pending': pending_messages,
},
'llm_stats': {
'total_calls': total_llm_calls,
'success_calls': success_llm_calls,
'error_calls': error_llm_calls,
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'total_tokens': total_tokens,
'average_duration_ms': int(total_duration / total_llm_calls) if total_llm_calls > 0 else 0,
},
'errors': errors,
'session_duration_seconds': session_duration_seconds,
}
async def get_message_details(
self,
message_id: str,
) -> dict:
"""Get detailed information for a specific message including associated LLM calls and errors"""
# Get message info
message_query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).where(
persistence_monitoring.MonitoringMessage.id == message_id
)
message_result = await self.ap.persistence_mgr.execute_async(message_query)
message_row = message_result.first()
if not message_row:
return {
'message_id': message_id,
'found': False,
}
message = message_row[0] if isinstance(message_row, tuple) else message_row
# Get LLM calls for this message
llm_query = (
sqlalchemy.select(persistence_monitoring.MonitoringLLMCall)
.where(persistence_monitoring.MonitoringLLMCall.message_id == message_id)
.order_by(persistence_monitoring.MonitoringLLMCall.timestamp.asc())
)
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_rows = llm_result.all()
llm_calls = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
)
for row in llm_rows
]
# Calculate LLM statistics
total_input_tokens = sum(call.input_tokens for call in llm_rows)
total_output_tokens = sum(call.output_tokens for call in llm_rows)
total_tokens = sum(call.total_tokens for call in llm_rows)
total_duration = sum(call.duration for call in llm_rows)
# Get errors for this message
error_query = (
sqlalchemy.select(persistence_monitoring.MonitoringError)
.where(persistence_monitoring.MonitoringError.message_id == message_id)
.order_by(persistence_monitoring.MonitoringError.timestamp.asc())
)
error_result = await self.ap.persistence_mgr.execute_async(error_query)
error_rows = error_result.all()
errors = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in error_rows
]
return {
'message_id': message_id,
'found': True,
'message': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, message),
'llm_calls': llm_calls,
'llm_stats': {
'total_calls': len(llm_rows),
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'total_tokens': total_tokens,
'total_duration_ms': total_duration,
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
},
'errors': errors,
}

View File

@@ -29,6 +29,7 @@ from ..api.http.service import mcp as mcp_service
from ..api.http.service import apikey as apikey_service
from ..api.http.service import webhook as webhook_service
from ..api.http.service import external_kb as external_kb_service
from ..api.http.service import monitoring as monitoring_service
from ..discover import engine as discover_engine
from ..storage import mgr as storagemgr
from ..utils import logcache
@@ -143,6 +144,8 @@ class Application:
telemetry: telemetry_module.TelemetryManager = None
monitoring_service: monitoring_service.MonitoringService = None
def __init__(self):
pass

View File

@@ -26,6 +26,7 @@ from ...api.http.service import mcp as mcp_service
from ...api.http.service import apikey as apikey_service
from ...api.http.service import webhook as webhook_service
from ...api.http.service import external_kb as external_kb_service
from ...api.http.service import monitoring as monitoring_service
from ...discover import engine as discover_engine
from ...storage import mgr as storagemgr
from ...utils import logcache
@@ -149,6 +150,9 @@ class BuildAppStage(stage.BootingStage):
await http_ctrl.initialize()
ap.http_ctrl = http_ctrl
monitoring_service_inst = monitoring_service.MonitoringService(ap)
ap.monitoring_service = monitoring_service_inst
async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None:
await asyncio.sleep(3)
await plugin_connector_inst.initialize()

View File

@@ -0,0 +1,105 @@
import sqlalchemy
from .base import Base
class MonitoringMessage(Base):
"""Monitoring message records"""
__tablename__ = 'monitoring_messages'
id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
timestamp = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
message_content = sqlalchemy.Column(sqlalchemy.Text, nullable=False)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) # success, error, pending
level = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) # info, warning, error, debug
platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
runner_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # Runner name for this query
variables = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # Query variables as JSON string
class MonitoringLLMCall(Base):
"""LLM call records"""
__tablename__ = 'monitoring_llm_calls'
id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
timestamp = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
model_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
input_tokens = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
output_tokens = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
total_tokens = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) # milliseconds
cost = sqlalchemy.Column(sqlalchemy.Float, nullable=True)
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) # success, error
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) # Associated message ID
class MonitoringSession(Base):
"""Session tracking records"""
__tablename__ = 'monitoring_sessions'
session_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
message_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=False, default=0)
start_time = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
last_activity = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
is_active = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=True, index=True)
platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
class MonitoringError(Base):
"""Error log records"""
__tablename__ = 'monitoring_errors'
id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
timestamp = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
error_type = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=False)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
stack_trace = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) # Associated message ID
class MonitoringEmbeddingCall(Base):
"""Embedding call records"""
__tablename__ = 'monitoring_embedding_calls'
id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
timestamp = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
model_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
prompt_tokens = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
total_tokens = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) # milliseconds
input_count = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) # Number of input texts
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) # success, error
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
# Optional context fields
knowledge_base_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
query_text = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # For retrieval calls
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
call_type = sqlalchemy.Column(sqlalchemy.String(50), nullable=True) # embedding, retrieve

View File

@@ -0,0 +1,270 @@
"""
Monitoring helper for recording events during pipeline execution.
This module provides convenient methods to record monitoring data
without cluttering the main pipeline code.
"""
from __future__ import annotations
import traceback
import typing
import time
import json
if typing.TYPE_CHECKING:
from ..core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
class MonitoringHelper:
"""Helper class for monitoring operations"""
@staticmethod
async def record_query_start(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
runner_name: str | None = None,
) -> str:
"""Record the start of query processing, returns message_id"""
try:
# Check if session exists, if not, record session start
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Try to record message
# Use JSON serialization to preserve message chain structure (including image URLs, etc.)
if hasattr(query, 'message_chain') and hasattr(query.message_chain, 'model_dump'):
message_content = json.dumps(query.message_chain.model_dump(), ensure_ascii=False)
else:
message_content = str(query)
# Variables will be updated in record_query_success after preproc stage sets them
# Here we just record None, the full variables will be set when query completes
message_id = await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=message_content,
session_id=session_id,
status='pending',
level='info',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
variables=None, # Will be updated in record_query_success
)
# Update session activity or create new session if it doesn't exist
# Always pass pipeline info to handle pipeline switches
session_updated = await ap.monitoring_service.update_session_activity(
session_id,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
)
if not session_updated:
# Session doesn't exist, create it
await ap.monitoring_service.record_session_start(
session_id=session_id,
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
)
return message_id
except Exception as e:
ap.logger.error(f'Failed to record query start: {e}')
return ''
@staticmethod
async def record_query_success(
ap: app.Application,
message_id: str,
query: pipeline_query.Query | None = None,
):
"""Record successful query processing by updating message status and variables"""
try:
if message_id:
# Serialize query.variables (filtering out internal variables)
query_variables_str = None
if query and hasattr(query, 'variables') and query.variables:
filtered_vars = {k: v for k, v in query.variables.items() if not k.startswith('_')}
if filtered_vars:
try:
query_variables_str = json.dumps(filtered_vars, ensure_ascii=False, default=str)
except Exception:
pass
await ap.monitoring_service.update_message_status(
message_id=message_id,
status='success',
variables=query_variables_str,
)
except Exception as e:
ap.logger.error(f'Failed to record query success: {e}')
@staticmethod
async def record_query_error(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
error: Exception,
runner_name: str | None = None,
) -> str:
"""Record query processing error, returns message_id"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Record error message
message_id = await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=f'Error: {str(error)}',
session_id=session_id,
status='error',
level='error',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
)
# Record error log
await ap.monitoring_service.record_error(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
error_type=type(error).__name__,
error_message=str(error),
session_id=session_id,
stack_trace=traceback.format_exc(),
message_id=message_id,
)
return message_id
except Exception as e:
ap.logger.error(f'Failed to record query error: {e}')
return ''
@staticmethod
async def record_llm_call(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
model_name: str,
input_tokens: int,
output_tokens: int,
duration_ms: int,
status: str = 'success',
cost: float | None = None,
error_message: str | None = None,
message_id: str | None = None,
):
"""Record LLM call"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
await ap.monitoring_service.record_llm_call(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
session_id=session_id,
model_name=model_name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration=duration_ms,
status=status,
cost=cost,
error_message=error_message,
message_id=message_id,
)
except Exception as e:
ap.logger.error(f'Failed to record LLM call: {e}')
class LLMCallMonitor:
"""Context manager for monitoring LLM calls"""
def __init__(
self,
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
model_name: str,
):
self.ap = ap
self.query = query
self.bot_id = bot_id
self.bot_name = bot_name
self.pipeline_id = pipeline_id
self.pipeline_name = pipeline_name
self.model_name = model_name
self.start_time = None
self.input_tokens = 0
self.output_tokens = 0
async def __aenter__(self):
self.start_time = time.time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
duration_ms = int((time.time() - self.start_time) * 1000)
if exc_type is not None:
# Error occurred
await MonitoringHelper.record_llm_call(
ap=self.ap,
query=self.query,
bot_id=self.bot_id,
bot_name=self.bot_name,
pipeline_id=self.pipeline_id,
pipeline_name=self.pipeline_name,
model_name=self.model_name,
input_tokens=self.input_tokens,
output_tokens=self.output_tokens,
duration_ms=duration_ms,
status='error',
error_message=str(exc_val) if exc_val else None,
)
else:
# Success
await MonitoringHelper.record_llm_call(
ap=self.ap,
query=self.query,
bot_id=self.bot_id,
bot_name=self.bot_name,
pipeline_id=self.pipeline_id,
pipeline_name=self.pipeline_name,
model_name=self.model_name,
input_tokens=self.input_tokens,
output_tokens=self.output_tokens,
duration_ms=duration_ms,
status='success',
)
return False # Don't suppress exceptions

View File

@@ -115,6 +115,25 @@ class RuntimePipeline:
# Store bound plugins and MCP servers in query for filtering
query.variables['_pipeline_bound_plugins'] = self.bound_plugins
query.variables['_pipeline_bound_mcp_servers'] = self.bound_mcp_servers
# Record query start for monitoring
try:
# Get bot name from bot_uuid
bot_name = 'WebChat'
if query.bot_uuid:
try:
bot = await self.ap.bot_service.get_bot(query.bot_uuid, include_secret=False)
if bot:
bot_name = bot.get('name', 'Unknown')
except Exception:
pass
# Store for later use in process_query
query.variables['_monitoring_bot_name'] = bot_name
query.variables['_monitoring_pipeline_name'] = self.pipeline_entity.name
except Exception as e:
self.ap.logger.error(f'Failed to prepare monitoring data: {e}')
await self.process_query(query)
async def _check_output(self, query: pipeline_query.Query, result: pipeline_entities.StageProcessResult):
@@ -131,7 +150,7 @@ class RuntimePipeline:
query.message_event, platform_events.GroupMessage
):
result.user_notice.insert(0, platform_message.At(target=query.message_event.sender.id))
if await query.adapter.is_stream_output_supported():
if await query.adapter.is_stream_output_supported() and query.resp_messages:
await query.adapter.reply_message_chunk(
message_source=query.message_event,
bot_message=query.resp_messages[-1],
@@ -151,6 +170,37 @@ class RuntimePipeline:
self.ap.logger.info(result.console_notice)
if result.error_notice:
self.ap.logger.error(result.error_notice)
# Mark query as having error
query.variables['_monitoring_has_error'] = True
# Record error to monitoring system
try:
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
message_id = query.variables.get('_monitoring_message_id', '')
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Update message status to error
if message_id:
await self.ap.monitoring_service.update_message_status(
message_id=message_id,
status='error',
level='error',
)
# Record error log
await self.ap.monitoring_service.record_error(
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
error_type='PipelineError',
error_message=result.error_notice,
session_id=session_id,
stack_trace=result.debug_notice if result.debug_notice else None,
message_id=message_id,
)
except Exception as e:
self.ap.logger.error(f'Failed to record error to monitoring: {e}')
async def _execute_from_stage(
self,
@@ -221,6 +271,34 @@ class RuntimePipeline:
async def process_query(self, query: pipeline_query.Query):
"""处理请求"""
# Get monitoring metadata
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
# Get runner name from pipeline config
runner_name = None
if query.pipeline_config and 'ai' in query.pipeline_config and 'runner' in query.pipeline_config['ai']:
runner_name = query.pipeline_config['ai']['runner'].get('runner')
# Record query start and store message_id
message_id = ''
try:
from . import monitoring_helper
message_id = await monitoring_helper.MonitoringHelper.record_query_start(
ap=self.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
runner_name=runner_name,
)
# Store message_id in query variables for LLM call monitoring
query.variables['_monitoring_message_id'] = message_id
except Exception as e:
self.ap.logger.error(f'Failed to record query start: {e}')
try:
# Get bound plugins for this pipeline
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
@@ -249,10 +327,40 @@ class RuntimePipeline:
self.ap.logger.debug(f'Processing query {query.query_id}')
await self._execute_from_stage(0, query)
# Record query success only if no error occurred during processing
if not query.variables.get('_monitoring_has_error', False):
try:
await monitoring_helper.MonitoringHelper.record_query_success(
ap=self.ap,
message_id=message_id,
query=query,
)
except Exception as e:
self.ap.logger.error(f'Failed to record query success: {e}')
except Exception as e:
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
# Record query error
try:
from . import monitoring_helper
await monitoring_helper.MonitoringHelper.record_query_error(
ap=self.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
error=e,
runner_name=runner_name,
)
except Exception as me:
self.ap.logger.error(f'Failed to record query error: {me}')
finally:
self.ap.logger.debug(f'Query {query.query_id} processed')
del self.ap.query_pool.cached_queries[query.query_id]

View File

@@ -324,7 +324,7 @@ class RuntimeConnectionHandler(handler.Handler):
messages_obj = [provider_message.Message.model_validate(message) for message in messages]
funcs_obj = [resource_tool.LLMTool.model_validate(func) for func in funcs]
result = await llm_model.provider.requester.invoke_llm(
result = await llm_model.provider.invoke_llm(
query=None,
model=llm_model,
messages=messages_obj,

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import abc
import typing
import time
from ...core import app
from ...entity.persistence import model as persistence_model
@@ -33,6 +34,219 @@ class RuntimeProvider:
self.token_mgr = token_mgr
self.requester = requester
async def invoke_llm(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
"""Bridge method for invoking LLM with monitoring"""
# Start timing for monitoring
start_time = time.time()
input_tokens = 0
output_tokens = 0
status = 'success'
error_message = None
try:
# Call the underlying requester
result = await self.requester.invoke_llm(
query=query,
model=model,
messages=messages,
funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
)
# Try to extract token usage if the requester returns it
# For requesters that return tuple (message, usage_info)
if isinstance(result, tuple):
msg, usage_info = result
if usage_info:
input_tokens = usage_info.get('input_tokens', 0)
output_tokens = usage_info.get('output_tokens', 0)
return msg
else:
return result
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record LLM call monitoring data (only if query is provided)
if query is not None:
duration_ms = int((time.time() - start_time) * 1000)
# Import monitoring helper
try:
from ...pipeline import monitoring_helper
# Get monitoring metadata from query variables
if query.variables:
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
message_id = query.variables.get('_monitoring_message_id')
else:
bot_name = 'Unknown'
pipeline_name = 'Unknown'
message_id = None
await monitoring_helper.MonitoringHelper.record_llm_call(
ap=self.requester.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=query.pipeline_uuid or 'unknown',
pipeline_name=pipeline_name,
model_name=model.model_entity.name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
status=status,
error_message=error_message,
message_id=message_id,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
"""Bridge method for invoking LLM stream with monitoring"""
# Start timing for monitoring
start_time = time.time()
status = 'success'
error_message = None
# Note: Stream doesn't easily provide token counts, set to 0
input_tokens = 0
output_tokens = 0
try:
# Stream the response
async for chunk in self.requester.invoke_llm_stream(
query=query,
model=model,
messages=messages,
funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
):
yield chunk
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record LLM call monitoring data (only if query is provided)
if query is not None:
duration_ms = int((time.time() - start_time) * 1000)
# Import monitoring helper
try:
from ...pipeline import monitoring_helper
# Get monitoring metadata from query variables
if query.variables:
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
message_id = query.variables.get('_monitoring_message_id')
else:
bot_name = 'Unknown'
pipeline_name = 'Unknown'
message_id = None
await monitoring_helper.MonitoringHelper.record_llm_call(
ap=self.requester.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=query.pipeline_uuid or 'unknown',
pipeline_name=pipeline_name,
model_name=model.model_entity.name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
status=status,
error_message=error_message,
message_id=message_id,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
async def invoke_embedding(
self,
model: RuntimeEmbeddingModel,
input_text: typing.List[str],
extra_args: dict[str, typing.Any] = {},
knowledge_base_id: str | None = None,
query_text: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
call_type: str | None = None,
) -> typing.List[typing.List[float]]:
"""Bridge method for invoking embedding with monitoring"""
# Start timing for monitoring
start_time = time.time()
prompt_tokens = 0
total_tokens = 0
status = 'success'
error_message = None
try:
# Call the underlying requester
result = await self.requester.invoke_embedding(
model=model,
input_text=input_text,
extra_args=extra_args,
)
# Handle both old format (list only) and new format (tuple with usage)
if isinstance(result, tuple):
embeddings, usage_info = result
if usage_info:
prompt_tokens = usage_info.get('prompt_tokens', 0)
total_tokens = usage_info.get('total_tokens', 0)
return embeddings
else:
return result
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record embedding call monitoring data
duration_ms = int((time.time() - start_time) * 1000)
try:
await self.requester.ap.monitoring_service.record_embedding_call(
model_name=model.model_entity.name,
prompt_tokens=prompt_tokens,
total_tokens=total_tokens,
duration=duration_ms,
input_count=len(input_text),
status=status,
error_message=error_message,
knowledge_base_id=knowledge_base_id,
query_text=query_text,
session_id=session_id,
message_id=message_id,
call_type=call_type,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record embedding call: {monitor_err}')
class RuntimeLLMModel:
"""运行时模型"""
@@ -141,7 +355,7 @@ class ProviderAPIRequester(metaclass=abc.ABCMeta):
model: RuntimeEmbeddingModel,
input_text: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[typing.List[float]]:
) -> typing.Union[typing.List[typing.List[float]], tuple[typing.List[typing.List[float]], dict]]:
"""调用 Embedding API
Args:
@@ -151,5 +365,6 @@ class ProviderAPIRequester(metaclass=abc.ABCMeta):
Returns:
typing.List[typing.List[float]]: 返回的 embedding 向量
或者 tuple[typing.List[typing.List[float]], dict]: 返回 (embedding 向量, usage_info)
"""
pass

View File

@@ -253,7 +253,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
) -> tuple[provider_message.Message, dict]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
@@ -285,7 +285,14 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
# 处理请求结果
message = await self._make_msg(resp, remove_think)
return message
# Extract token usage from response
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return message, usage_info
async def invoke_llm(
self,
@@ -295,7 +302,8 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
) -> tuple[provider_message.Message, dict]:
"""Invoke LLM and return message with usage info"""
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
for m in messages:
msg_dict = m.dict(exclude_none=True)
@@ -308,7 +316,7 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
req_messages.append(msg_dict)
try:
msg = await self._closure(
msg, usage_info = await self._closure(
query=query,
req_messages=req_messages,
use_model=model,
@@ -316,30 +324,38 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
extra_args=extra_args,
remove_think=remove_think,
)
return msg
return msg, usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
if 'context_length_exceeded' in e.message:
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
error_message = str(e.message) if hasattr(e, 'message') else str(e)
if 'context_length_exceeded' in str(e):
raise errors.RequesterError(f'上文过长,请重置会话: {error_message}')
else:
raise errors.RequesterError(f'请求参数错误: {e.message}')
raise errors.RequesterError(f'请求参数错误: {error_message}')
except openai.AuthenticationError as e:
raise errors.RequesterError(f'无效的 api-key: {e.message}')
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'无效的 api-key: {error_message}')
except openai.NotFoundError as e:
raise errors.RequesterError(f'请求路径错误: {e.message}')
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求路径错误: {error_message}')
except openai.RateLimitError as e:
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求过于频繁或余额不足: {error_message}')
except openai.APIConnectionError as e:
error_message = f'连接错误: {str(e)}'
raise errors.RequesterError(error_message)
except openai.APIError as e:
raise errors.RequesterError(f'请求错误: {e.message}')
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求错误: {error_message}')
async def invoke_embedding(
self,
model: requester.RuntimeEmbeddingModel,
input_text: list[str],
extra_args: dict[str, typing.Any] = {},
) -> list[list[float]]:
"""调用 Embedding API"""
) -> tuple[list[list[float]], dict]:
"""调用 Embedding API, returns (embeddings, usage_info)"""
self.client.api_key = model.provider.token_mgr.get_token()
args = {
@@ -355,7 +371,13 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
try:
resp = await self.client.embeddings.create(**args)
return [d.embedding for d in resp.data]
# Extract usage info
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['prompt_tokens'] = resp.usage.prompt_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return [d.embedding for d in resp.data], usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:

View File

@@ -130,7 +130,7 @@ class LocalAgentRunner(runner.RequestRunner):
if not is_stream:
# 非流式输出,直接请求
msg = await use_llm_model.provider.requester.invoke_llm(
msg = await use_llm_model.provider.invoke_llm(
query,
use_llm_model,
req_messages,
@@ -147,7 +147,7 @@ class LocalAgentRunner(runner.RequestRunner):
accumulated_content = '' # 从开始累积的所有内容
last_role = 'assistant'
msg_sequence = 1
async for msg in use_llm_model.provider.requester.invoke_llm_stream(
async for msg in use_llm_model.provider.invoke_llm_stream(
query,
use_llm_model,
req_messages,
@@ -265,7 +265,7 @@ class LocalAgentRunner(runner.RequestRunner):
last_role = 'assistant'
msg_sequence = first_end_sequence
async for msg in use_llm_model.provider.requester.invoke_llm_stream(
async for msg in use_llm_model.provider.invoke_llm_stream(
query,
use_llm_model,
req_messages,
@@ -321,7 +321,7 @@ class LocalAgentRunner(runner.RequestRunner):
)
else:
# 处理完所有调用,再次请求
msg = await use_llm_model.provider.requester.invoke_llm(
msg = await use_llm_model.provider.invoke_llm(
query,
use_llm_model,
req_messages,

View File

@@ -38,10 +38,12 @@ class Embedder(BaseService):
for i in range(0, len(chunks), MAX_BATCH_SIZE):
batch = chunks[i : i + MAX_BATCH_SIZE]
batch_embeddings = await embedding_model.provider.requester.invoke_embedding(
batch_embeddings = await embedding_model.provider.invoke_embedding(
model=embedding_model,
input_text=batch,
extra_args={}, # TODO: add extra args
knowledge_base_id=kb_id,
call_type='embedding',
)
embeddings_list.extend(batch_embeddings)

View File

@@ -19,10 +19,13 @@ class Retriever(base_service.BaseService):
f"Retrieving for query: '{query[:10]}' with k={k} using {embedding_model.model_entity.uuid}"
)
query_embedding: list[float] = await embedding_model.provider.requester.invoke_embedding(
query_embedding: list[float] = await embedding_model.provider.invoke_embedding(
model=embedding_model,
input_text=[query],
extra_args={}, # TODO: add extra args
knowledge_base_id=kb_id,
query_text=query,
call_type='retrieve',
)
vector_results = await self.ap.vector_db_mgr.vector_db.search(kb_id, query_embedding[0], k)