Files
LangBot/src/langbot/pkg/api/http/service/monitoring.py
6mvp6 83ccb33fd3 fix(monitoring): WeChat Work feedback recording bugs (#2108)
* fix(monitoring): fix WeChat Work feedback recording bugs

- Fix feedback events silently dropped when stream session expires:
  dispatch feedback handlers regardless of session availability
- Fix IntegrityError on repeated feedback (like→dislike) for same
  message: implement UPSERT logic in record_feedback()
- Fix cancel feedback (type=3) not removing records: add delete logic
- Fix inaccurate_reasons validation error: convert int reason codes
  to strings before creating FeedbackEvent (Pydantic expects List[str])
- Fix feedback timestamps 8 hours off in frontend: use parseUTCTimestamp
  instead of new Date() for UTC timestamp parsing
- Fix StreamSessionManager.cleanup missing _feedback_index cleanup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(monitoring): apply ruff format to wecom feedback files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: 6mvp6 <13727783693@163.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 17:12:43 +08:00

1497 lines
62 KiB
Python

from __future__ import annotations
import uuid
import datetime
import sqlalchemy
from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring
class MonitoringService:
"""Monitoring service"""
ap: app.Application
def __init__(self, ap: app.Application) -> None:
self.ap = ap
# ========== Cleanup Methods ==========
async def cleanup_expired_records(self, retention_days: int) -> dict[str, int]:
"""Delete monitoring records older than the specified retention period.
Args:
retention_days: Number of days to retain records.
Returns:
A dict mapping table name to the number of deleted rows.
"""
cutoff = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(
days=retention_days
)
tables_and_columns: list[tuple[str, type, sqlalchemy.Column]] = [
(
'monitoring_messages',
persistence_monitoring.MonitoringMessage,
persistence_monitoring.MonitoringMessage.timestamp,
),
(
'monitoring_llm_calls',
persistence_monitoring.MonitoringLLMCall,
persistence_monitoring.MonitoringLLMCall.timestamp,
),
(
'monitoring_embedding_calls',
persistence_monitoring.MonitoringEmbeddingCall,
persistence_monitoring.MonitoringEmbeddingCall.timestamp,
),
(
'monitoring_errors',
persistence_monitoring.MonitoringError,
persistence_monitoring.MonitoringError.timestamp,
),
(
'monitoring_sessions',
persistence_monitoring.MonitoringSession,
persistence_monitoring.MonitoringSession.last_activity,
),
]
deleted_counts: dict[str, int] = {}
for table_name, model_cls, ts_column in tables_and_columns:
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.delete(model_cls).where(ts_column < cutoff))
deleted_counts[table_name] = result.rowcount
return deleted_counts
# ========== Recording Methods ==========
async def record_message(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
message_content: str,
session_id: str,
status: str = 'success',
level: str = 'info',
platform: str | None = None,
user_id: str | None = None,
user_name: str | None = None,
runner_name: str | None = None,
variables: str | None = None,
role: str = 'user',
) -> str:
"""Record a message"""
message_id = str(uuid.uuid4())
message_data = {
'id': message_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'message_content': message_content,
'session_id': session_id,
'status': status,
'level': level,
'platform': platform,
'user_id': user_id,
'user_name': user_name,
'runner_name': runner_name,
'variables': variables,
'role': role,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringMessage).values(message_data)
)
return message_id
async def record_llm_call(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
session_id: str,
model_name: str,
input_tokens: int,
output_tokens: int,
duration: int,
status: str = 'success',
cost: float | None = None,
error_message: str | None = None,
message_id: str | None = None,
) -> str:
"""Record an LLM call"""
call_id = str(uuid.uuid4())
call_data = {
'id': call_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'model_name': model_name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'duration': duration,
'cost': cost,
'status': status,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'error_message': error_message,
'message_id': message_id,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringLLMCall).values(call_data)
)
return call_id
async def record_embedding_call(
self,
model_name: str,
prompt_tokens: int,
total_tokens: int,
duration: int,
input_count: int,
status: str = 'success',
error_message: str | None = None,
knowledge_base_id: str | None = None,
query_text: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
call_type: str | None = None,
) -> str:
"""Record an embedding call"""
call_id = str(uuid.uuid4())
call_data = {
'id': call_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'model_name': model_name,
'prompt_tokens': prompt_tokens,
'total_tokens': total_tokens,
'duration': duration,
'input_count': input_count,
'status': status,
'error_message': error_message,
'knowledge_base_id': knowledge_base_id,
'query_text': query_text,
'session_id': session_id,
'message_id': message_id,
'call_type': call_type,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringEmbeddingCall).values(call_data)
)
return call_id
async def record_session_start(
self,
session_id: str,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
platform: str | None = None,
user_id: str | None = None,
user_name: str | None = None,
) -> None:
"""Record a new session"""
session_data = {
'session_id': session_id,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'message_count': 0,
'start_time': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'is_active': True,
'platform': platform,
'user_id': user_id,
'user_name': user_name,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSession).values(session_data)
)
async def update_session_activity(
self,
session_id: str,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
) -> bool:
"""Update session last activity time and increment message count.
Also updates pipeline info if the bot's pipeline has changed.
Returns:
True if session was found and updated, False if session doesn't exist.
"""
update_values = {
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'message_count': persistence_monitoring.MonitoringSession.message_count + 1,
}
# Update pipeline info if provided (handles pipeline switch)
if pipeline_id is not None:
update_values['pipeline_id'] = pipeline_id
if pipeline_name is not None:
update_values['pipeline_name'] = pipeline_name
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringSession)
.where(persistence_monitoring.MonitoringSession.session_id == session_id)
.values(update_values)
)
# Check if any rows were updated
return result.rowcount > 0
async def record_error(
self,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
error_type: str,
error_message: str,
session_id: str | None = None,
stack_trace: str | None = None,
message_id: str | None = None,
) -> str:
"""Record an error"""
error_id = str(uuid.uuid4())
error_data = {
'id': error_id,
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
'error_type': error_type,
'error_message': error_message,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'stack_trace': stack_trace,
'message_id': message_id,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringError).values(error_data)
)
return error_id
async def update_message_status(
self,
message_id: str,
status: str,
level: str | None = None,
variables: str | None = None,
) -> None:
"""Update message status and optionally variables"""
update_values = {'status': status}
if level is not None:
update_values['level'] = level
if variables is not None:
update_values['variables'] = variables
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringMessage)
.where(persistence_monitoring.MonitoringMessage.id == message_id)
.values(update_values)
)
# ========== Query Methods ==========
async def get_overview_metrics(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
) -> dict:
"""Get overview metrics"""
# Build base query conditions
message_conditions = []
llm_conditions = []
embedding_conditions = []
session_conditions = []
if bot_ids:
message_conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
session_conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
if pipeline_ids:
message_conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
session_conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
if start_time:
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
session_conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
if end_time:
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
session_conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
# Total messages
message_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
if message_conditions:
message_query = message_query.where(sqlalchemy.and_(*message_conditions))
total_messages_result = await self.ap.persistence_mgr.execute_async(message_query)
total_messages = total_messages_result.scalar() or 0
# Total LLM calls
llm_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
if llm_conditions:
llm_query = llm_query.where(sqlalchemy.and_(*llm_conditions))
llm_calls_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_calls = llm_calls_result.scalar() or 0
# Total Embedding calls
embedding_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
if embedding_conditions:
embedding_query = embedding_query.where(sqlalchemy.and_(*embedding_conditions))
embedding_calls_result = await self.ap.persistence_mgr.execute_async(embedding_query)
embedding_calls = embedding_calls_result.scalar() or 0
# Total model calls (LLM + Embedding)
model_calls = llm_calls + embedding_calls
# Success rate (based on messages)
success_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id)).where(
persistence_monitoring.MonitoringMessage.status == 'success'
)
if message_conditions:
success_query = success_query.where(sqlalchemy.and_(*message_conditions))
success_result = await self.ap.persistence_mgr.execute_async(success_query)
success_count = success_result.scalar() or 0
success_rate = (success_count / total_messages * 100) if total_messages > 0 else 100
# Active sessions
active_session_query = sqlalchemy.select(
sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id)
).where(persistence_monitoring.MonitoringSession.is_active == True)
if session_conditions:
active_session_query = active_session_query.where(sqlalchemy.and_(*session_conditions))
active_sessions_result = await self.ap.persistence_mgr.execute_async(active_session_query)
active_sessions = active_sessions_result.scalar() or 0
return {
'total_messages': total_messages,
'llm_calls': llm_calls,
'embedding_calls': embedding_calls,
'model_calls': model_calls,
'success_rate': round(success_rate, 2),
'active_sessions': active_sessions,
}
async def get_messages(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
session_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get messages with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
if session_ids:
conditions.append(persistence_monitoring.MonitoringMessage.session_id.in_(session_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get messages
query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).order_by(
persistence_monitoring.MonitoringMessage.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
messages_rows = result.all()
serialized = []
for row in messages_rows:
# Extract model instance from Row (SQLAlchemy returns Row objects)
msg = row[0] if isinstance(row, tuple) else row
serialized_msg = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, msg)
serialized.append(serialized_msg)
return (serialized, total)
async def get_llm_calls(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get LLM calls with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get LLM calls
query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).order_by(
persistence_monitoring.MonitoringLLMCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
llm_calls_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
)
for row in llm_calls_rows
],
total,
)
async def get_embedding_calls(
self,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
knowledge_base_id: str | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get embedding calls with filters"""
conditions = []
if start_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
if knowledge_base_id:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.knowledge_base_id == knowledge_base_id)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get embedding calls
query = sqlalchemy.select(persistence_monitoring.MonitoringEmbeddingCall).order_by(
persistence_monitoring.MonitoringEmbeddingCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
embedding_calls_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringEmbeddingCall, row[0] if isinstance(row, tuple) else row
)
for row in embedding_calls_rows
],
total,
)
async def get_sessions(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
is_active: bool | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get sessions with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
if is_active is not None:
conditions.append(persistence_monitoring.MonitoringSession.is_active == is_active)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get sessions
query = sqlalchemy.select(persistence_monitoring.MonitoringSession).order_by(
persistence_monitoring.MonitoringSession.last_activity.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
sessions_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringSession, row[0] if isinstance(row, tuple) else row
)
for row in sessions_rows
],
total,
)
async def get_errors(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get errors with filters"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringError.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringError.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringError.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get errors
query = sqlalchemy.select(persistence_monitoring.MonitoringError).order_by(
persistence_monitoring.MonitoringError.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
errors_rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in errors_rows
],
total,
)
async def get_session_analysis(
self,
session_id: str,
) -> dict:
"""Get detailed analysis for a specific session"""
# Get session info
session_query = sqlalchemy.select(persistence_monitoring.MonitoringSession).where(
persistence_monitoring.MonitoringSession.session_id == session_id
)
session_result = await self.ap.persistence_mgr.execute_async(session_query)
session_row = session_result.first()
if not session_row:
return {
'session_id': session_id,
'found': False,
}
session = session_row[0] if isinstance(session_row, tuple) else session_row
# Get messages for this session
messages_query = (
sqlalchemy.select(persistence_monitoring.MonitoringMessage)
.where(persistence_monitoring.MonitoringMessage.session_id == session_id)
.order_by(persistence_monitoring.MonitoringMessage.timestamp.asc())
)
messages_result = await self.ap.persistence_mgr.execute_async(messages_query)
messages_rows = messages_result.all()
# Count messages by status
success_messages = 0
error_messages = 0
pending_messages = 0
for row in messages_rows:
msg = row[0] if isinstance(row, tuple) else row
if msg.status == 'success':
success_messages += 1
elif msg.status == 'error':
error_messages += 1
elif msg.status == 'pending':
pending_messages += 1
# Get LLM calls for this session
llm_query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).where(
persistence_monitoring.MonitoringLLMCall.session_id == session_id
)
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_rows = llm_result.all()
# Calculate LLM statistics
total_llm_calls = len(llm_rows)
total_input_tokens = 0
total_output_tokens = 0
total_tokens = 0
total_duration = 0
success_llm_calls = 0
error_llm_calls = 0
for row in llm_rows:
llm_call = row[0] if isinstance(row, tuple) else row
total_input_tokens += llm_call.input_tokens
total_output_tokens += llm_call.output_tokens
total_tokens += llm_call.total_tokens
total_duration += llm_call.duration
if llm_call.status == 'success':
success_llm_calls += 1
else:
error_llm_calls += 1
# Get errors for this session
error_query = (
sqlalchemy.select(persistence_monitoring.MonitoringError)
.where(persistence_monitoring.MonitoringError.session_id == session_id)
.order_by(persistence_monitoring.MonitoringError.timestamp.desc())
)
error_result = await self.ap.persistence_mgr.execute_async(error_query)
error_rows = error_result.all()
errors = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in error_rows
]
# Calculate session duration
if messages_rows:
first_msg = messages_rows[0][0] if isinstance(messages_rows[0], tuple) else messages_rows[0]
last_msg = messages_rows[-1][0] if isinstance(messages_rows[-1], tuple) else messages_rows[-1]
session_duration_seconds = int((last_msg.timestamp - first_msg.timestamp).total_seconds())
else:
session_duration_seconds = 0
return {
'session_id': session_id,
'found': True,
'session': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSession, session),
'message_stats': {
'total': len(messages_rows),
'success': success_messages,
'error': error_messages,
'pending': pending_messages,
},
'llm_stats': {
'total_calls': total_llm_calls,
'success_calls': success_llm_calls,
'error_calls': error_llm_calls,
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'total_tokens': total_tokens,
'average_duration_ms': int(total_duration / total_llm_calls) if total_llm_calls > 0 else 0,
},
'errors': errors,
'session_duration_seconds': session_duration_seconds,
}
async def get_message_details(
self,
message_id: str,
) -> dict:
"""Get detailed information for a specific message including associated LLM calls and errors"""
# Get message info
message_query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).where(
persistence_monitoring.MonitoringMessage.id == message_id
)
message_result = await self.ap.persistence_mgr.execute_async(message_query)
message_row = message_result.first()
if not message_row:
return {
'message_id': message_id,
'found': False,
}
message = message_row[0] if isinstance(message_row, tuple) else message_row
# Get LLM calls for this message
llm_query = (
sqlalchemy.select(persistence_monitoring.MonitoringLLMCall)
.where(persistence_monitoring.MonitoringLLMCall.message_id == message_id)
.order_by(persistence_monitoring.MonitoringLLMCall.timestamp.asc())
)
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
llm_rows = llm_result.all()
llm_calls = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
)
for row in llm_rows
]
# Calculate LLM statistics
total_input_tokens = sum(call.input_tokens for call in llm_rows)
total_output_tokens = sum(call.output_tokens for call in llm_rows)
total_tokens = sum(call.total_tokens for call in llm_rows)
total_duration = sum(call.duration for call in llm_rows)
# Get errors for this message
error_query = (
sqlalchemy.select(persistence_monitoring.MonitoringError)
.where(persistence_monitoring.MonitoringError.message_id == message_id)
.order_by(persistence_monitoring.MonitoringError.timestamp.asc())
)
error_result = await self.ap.persistence_mgr.execute_async(error_query)
error_rows = error_result.all()
errors = [
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
)
for row in error_rows
]
return {
'message_id': message_id,
'found': True,
'message': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, message),
'llm_calls': llm_calls,
'llm_stats': {
'total_calls': len(llm_rows),
'total_input_tokens': total_input_tokens,
'total_output_tokens': total_output_tokens,
'total_tokens': total_tokens,
'total_duration_ms': total_duration,
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
},
'errors': errors,
}
# ========== Export Methods ==========
def _escape_csv_field(self, field: str | None) -> str:
"""Escape a field for CSV output"""
if field is None:
return ''
# Convert non-string types to string first
if not isinstance(field, str):
field = str(field)
# Replace common escape sequences
field = field.replace('\r\n', '\n').replace('\r', '\n')
# If field contains comma, double quote, or newline, wrap in quotes
if ',' in field or '"' in field or '\n' in field:
# Escape double quotes by doubling them
field = '"' + field.replace('"', '""') + '"'
return field
def _format_timestamp(self, dt: datetime.datetime) -> str:
"""Format datetime to ISO format string"""
return dt.strftime('%Y-%m-%d %H:%M:%S')
def _extract_message_text(self, message_content: str) -> str:
"""Extract plain text from message chain JSON"""
if not message_content:
return ''
try:
import json
message_chain = json.loads(message_content)
if not isinstance(message_chain, list):
return message_content
text_parts = []
for component in message_chain:
if not isinstance(component, dict):
continue
component_type = component.get('type')
if component_type == 'Plain':
text = component.get('text', '')
text_parts.append(text)
elif component_type == 'At':
display = component.get('display', '')
target = component.get('target', '')
if display:
text_parts.append(f'@{display}')
elif target:
text_parts.append(f'@{target}')
elif component_type == 'AtAll':
text_parts.append('@All')
elif component_type == 'Image':
text_parts.append('[Image]')
elif component_type == 'File':
name = component.get('name', 'File')
text_parts.append(f'[File: {name}]')
elif component_type == 'Voice':
length = component.get('length', 0)
text_parts.append(f'[Voice {length}s]')
elif component_type == 'Quote':
# Quote content is in 'origin' field
origin = component.get('origin', [])
if isinstance(origin, list):
for item in origin:
if isinstance(item, dict) and item.get('type') == 'Plain':
text_parts.append(f'> {item.get("text", "")}')
elif component_type == 'Source':
# Skip Source component
continue
else:
# Other unknown types
text_parts.append(f'[{component_type}]')
return ''.join(text_parts)
except (json.JSONDecodeError, TypeError, KeyError):
# If not valid JSON, return as-is
return message_content
async def export_messages(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export messages as list of dictionaries for CSV conversion"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).order_by(
persistence_monitoring.MonitoringMessage.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'id': row[0].id if isinstance(row, tuple) else row.id,
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
'runner_name': row[0].runner_name if isinstance(row, tuple) else row.runner_name,
'message_content': row[0].message_content if isinstance(row, tuple) else row.message_content,
'message_text': self._extract_message_text(
row[0].message_content if isinstance(row, tuple) else row.message_content
),
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'status': row[0].status if isinstance(row, tuple) else row.status,
'level': row[0].level if isinstance(row, tuple) else row.level,
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
}
for row in rows
]
async def export_llm_calls(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export LLM calls as list of dictionaries for CSV conversion"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).order_by(
persistence_monitoring.MonitoringLLMCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'id': row[0].id if isinstance(row, tuple) else row.id,
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
'model_name': row[0].model_name if isinstance(row, tuple) else row.model_name,
'input_tokens': row[0].input_tokens if isinstance(row, tuple) else row.input_tokens,
'output_tokens': row[0].output_tokens if isinstance(row, tuple) else row.output_tokens,
'total_tokens': row[0].total_tokens if isinstance(row, tuple) else row.total_tokens,
'duration_ms': row[0].duration if isinstance(row, tuple) else row.duration,
'cost': row[0].cost if isinstance(row, tuple) else row.cost,
'status': row[0].status if isinstance(row, tuple) else row.status,
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
}
for row in rows
]
async def export_embedding_calls(
self,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
knowledge_base_id: str | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export embedding calls as list of dictionaries for CSV conversion"""
conditions = []
if start_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
if knowledge_base_id:
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.knowledge_base_id == knowledge_base_id)
query = sqlalchemy.select(persistence_monitoring.MonitoringEmbeddingCall).order_by(
persistence_monitoring.MonitoringEmbeddingCall.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'id': row[0].id if isinstance(row, tuple) else row.id,
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
'model_name': row[0].model_name if isinstance(row, tuple) else row.model_name,
'prompt_tokens': row[0].prompt_tokens if isinstance(row, tuple) else row.prompt_tokens,
'total_tokens': row[0].total_tokens if isinstance(row, tuple) else row.total_tokens,
'duration_ms': row[0].duration if isinstance(row, tuple) else row.duration,
'input_count': row[0].input_count if isinstance(row, tuple) else row.input_count,
'status': row[0].status if isinstance(row, tuple) else row.status,
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
'knowledge_base_id': row[0].knowledge_base_id if isinstance(row, tuple) else row.knowledge_base_id,
'query_text': row[0].query_text if isinstance(row, tuple) else row.query_text,
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
'call_type': row[0].call_type if isinstance(row, tuple) else row.call_type,
}
for row in rows
]
async def export_errors(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export errors as list of dictionaries for CSV conversion"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringError.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringError.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringError.timestamp <= end_time)
query = sqlalchemy.select(persistence_monitoring.MonitoringError).order_by(
persistence_monitoring.MonitoringError.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'id': row[0].id if isinstance(row, tuple) else row.id,
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
'error_type': row[0].error_type if isinstance(row, tuple) else row.error_type,
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
'stack_trace': row[0].stack_trace if isinstance(row, tuple) else row.stack_trace,
}
for row in rows
]
async def export_sessions(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export sessions as list of dictionaries for CSV conversion"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
query = sqlalchemy.select(persistence_monitoring.MonitoringSession).order_by(
persistence_monitoring.MonitoringSession.last_activity.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
'message_count': row[0].message_count if isinstance(row, tuple) else row.message_count,
'start_time': self._format_timestamp(row[0].start_time if isinstance(row, tuple) else row.start_time),
'last_activity': self._format_timestamp(
row[0].last_activity if isinstance(row, tuple) else row.last_activity
),
'is_active': str(row[0].is_active if isinstance(row, tuple) else row.is_active),
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
}
for row in rows
]
# ========== Feedback Methods ==========
async def record_feedback(
self,
feedback_id: str,
feedback_type: int,
feedback_content: str | None = None,
inaccurate_reasons: list[str] | None = None,
bot_id: str | None = None,
bot_name: str | None = None,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
stream_id: str | None = None,
user_id: str | None = None,
platform: str | None = None,
) -> str:
"""Record user feedback (like/dislike) from AI Bot conversation.
Args:
feedback_id: Unique feedback identifier from platform (e.g., WeChat Work)
feedback_type: 1 = like (thumbs up), 2 = dislike (thumbs down)
feedback_content: Optional user feedback text
inaccurate_reasons: List of reasons for inaccurate response (for dislike)
bot_id: Bot ID
bot_name: Bot name
pipeline_id: Pipeline ID
pipeline_name: Pipeline name
session_id: Session ID
message_id: Message ID
stream_id: Stream ID (for WeChat Work streaming messages)
user_id: User ID
platform: Platform name (e.g., 'wecom')
Returns:
The record ID
"""
import json
now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
reasons_json = json.dumps(inaccurate_reasons, ensure_ascii=False) if inaccurate_reasons else None
MonitoringFeedback = persistence_monitoring.MonitoringFeedback
# Handle cancel feedback (type=3): delete existing record
if feedback_type == 3:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(MonitoringFeedback).where(MonitoringFeedback.feedback_id == feedback_id)
)
return None
# Check if record with this feedback_id already exists
existing_result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(MonitoringFeedback).where(MonitoringFeedback.feedback_id == feedback_id)
)
existing_row = existing_result.first()
if existing_row:
# UPDATE existing record
existing = existing_row[0] if isinstance(existing_row, tuple) else existing_row
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(MonitoringFeedback)
.where(MonitoringFeedback.feedback_id == feedback_id)
.values(
timestamp=now,
feedback_type=feedback_type,
feedback_content=feedback_content,
inaccurate_reasons=reasons_json,
bot_id=bot_id or existing.bot_id,
bot_name=bot_name or existing.bot_name,
pipeline_id=pipeline_id or existing.pipeline_id,
pipeline_name=pipeline_name or existing.pipeline_name,
session_id=session_id or existing.session_id,
message_id=message_id or existing.message_id,
stream_id=stream_id or existing.stream_id,
user_id=user_id or existing.user_id,
platform=platform or existing.platform,
)
)
return existing.id
else:
# INSERT new record with IntegrityError defense
record_id = str(uuid.uuid4())
record_data = {
'id': record_id,
'timestamp': now,
'feedback_id': feedback_id,
'feedback_type': feedback_type,
'feedback_content': feedback_content,
'inaccurate_reasons': reasons_json,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'message_id': message_id,
'stream_id': stream_id,
'user_id': user_id,
'platform': platform,
}
try:
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(MonitoringFeedback).values(record_data))
return record_id
except Exception:
# UNIQUE constraint conflict (concurrent feedback for same feedback_id)
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(MonitoringFeedback)
.where(MonitoringFeedback.feedback_id == feedback_id)
.values(
timestamp=now,
feedback_type=feedback_type,
feedback_content=feedback_content,
inaccurate_reasons=reasons_json,
)
)
return feedback_id
async def get_feedback_stats(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
) -> dict:
"""Get feedback statistics.
Returns:
Dictionary with total likes, dislikes, and breakdown by bot/pipeline
"""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
# Get total likes (feedback_type = 1)
likes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where(
persistence_monitoring.MonitoringFeedback.feedback_type == 1
)
if conditions:
likes_query = likes_query.where(sqlalchemy.and_(*conditions))
likes_result = await self.ap.persistence_mgr.execute_async(likes_query)
total_likes = likes_result.scalar() or 0
# Get total dislikes (feedback_type = 2)
dislikes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where(
persistence_monitoring.MonitoringFeedback.feedback_type == 2
)
if conditions:
dislikes_query = dislikes_query.where(sqlalchemy.and_(*conditions))
dislikes_result = await self.ap.persistence_mgr.execute_async(dislikes_query)
total_dislikes = dislikes_result.scalar() or 0
# Get total feedback count
total_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id))
if conditions:
total_query = total_query.where(sqlalchemy.and_(*conditions))
total_result = await self.ap.persistence_mgr.execute_async(total_query)
total_feedback = total_result.scalar() or 0
# Calculate satisfaction rate
satisfaction_rate = (total_likes / total_feedback * 100) if total_feedback > 0 else 0
# Get feedback by bot
bot_stats_query = sqlalchemy.select(
persistence_monitoring.MonitoringFeedback.bot_id,
persistence_monitoring.MonitoringFeedback.bot_name,
sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id).label('total'),
sqlalchemy.func.sum(
sqlalchemy.case((persistence_monitoring.MonitoringFeedback.feedback_type == 1, 1), else_=0)
).label('likes'),
sqlalchemy.func.sum(
sqlalchemy.case((persistence_monitoring.MonitoringFeedback.feedback_type == 2, 1), else_=0)
).label('dislikes'),
).group_by(
persistence_monitoring.MonitoringFeedback.bot_id,
persistence_monitoring.MonitoringFeedback.bot_name,
)
if conditions:
bot_stats_query = bot_stats_query.where(sqlalchemy.and_(*conditions))
bot_stats_result = await self.ap.persistence_mgr.execute_async(bot_stats_query)
bot_stats = [
{
'bot_id': row.bot_id,
'bot_name': row.bot_name,
'total': row.total,
'likes': row.likes or 0,
'dislikes': row.dislikes or 0,
}
for row in bot_stats_result.all()
]
return {
'total_feedback': total_feedback,
'total_likes': total_likes,
'total_dislikes': total_dislikes,
'satisfaction_rate': round(satisfaction_rate, 2),
'by_bot': bot_stats,
}
async def get_feedback_list(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
feedback_type: int | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get feedback list with filters."""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
if feedback_type is not None:
conditions.append(persistence_monitoring.MonitoringFeedback.feedback_type == feedback_type)
if start_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
# Get total count
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id))
if conditions:
count_query = count_query.where(sqlalchemy.and_(*conditions))
count_result = await self.ap.persistence_mgr.execute_async(count_query)
total = count_result.scalar() or 0
# Get feedback list
query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by(
persistence_monitoring.MonitoringFeedback.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return (
[
self.ap.persistence_mgr.serialize_model(
persistence_monitoring.MonitoringFeedback, row[0] if isinstance(row, tuple) else row
)
for row in rows
],
total,
)
async def export_feedback(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100000,
) -> list[dict]:
"""Export feedback as list of dictionaries for CSV conversion."""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by(
persistence_monitoring.MonitoringFeedback.timestamp.desc()
)
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
query = query.limit(limit)
result = await self.ap.persistence_mgr.execute_async(query)
rows = result.all()
return [
{
'id': row[0].id if isinstance(row, tuple) else row.id,
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
'feedback_id': row[0].feedback_id if isinstance(row, tuple) else row.feedback_id,
'feedback_type': 'like'
if (row[0].feedback_type if isinstance(row, tuple) else row.feedback_type) == 1
else 'dislike',
'feedback_content': row[0].feedback_content if isinstance(row, tuple) else row.feedback_content,
'inaccurate_reasons': row[0].inaccurate_reasons if isinstance(row, tuple) else row.inaccurate_reasons,
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
'stream_id': row[0].stream_id if isinstance(row, tuple) else row.stream_id,
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
}
for row in rows
]