mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-13 17:26:04 +00:00
* refactor(provider): use LiteLLM as unified LLM requester backend
- Replace 23+ individual requester implementations with unified litellmchat.py
- Add litellm_provider field to 27 YAML manifests for provider routing
- Delete redundant requester subclasses
- Add unit tests for LiteLLMRequester (29 tests)
- Fix num_retries parameter name (was max_retries)
- Fix exception handling order for subclass exceptions
LiteLLM provides unified API for 100+ providers, eliminating need for
provider-specific requesters.
* fix: ruff format provider.py
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(provider): simplify LiteLLM requester usage handling
- Remove unused Anthropic-specific tool schema generation
- Share completion argument construction between normal and streaming calls
- Use LiteLLM/OpenAI native usage fields for monitoring
- Collect stream token usage from LiteLLM stream_options
- Update LiteLLM requester tests for unified usage fields
* restore: restore deleted provider requester files
Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat: update requesters and improve provider selection UI
- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.
* fix(provider): align litellm rebase with master
* fix(provider): capture streaming token usage; add token observability
The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).
- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
endpoint: summary, per-model breakdown, token timeseries, and a
zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization
Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.
* refactor(provider): simplify litellm capabilities
* style: simplify wrapped expressions
* feat(models): persist context metadata
* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester
- invoke_embedding: support both object- and dict-shaped response.data
entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
openai-compatible (or unspecified) providers call the standard
Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path
* feat(provider): enforce requester support_type when adding models
- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
rerank) that the provider's requester declares in its manifest
support_type; ModelsDialog fetches requester manifests and maps
requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
create_embedding_model / create_rerank_model so a model cannot be
attached to a provider whose requester does not support that type,
even if the frontend restriction is bypassed (manifests without
support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
three model types:
- llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
- llm + text-embedding: openai, gemini, mistral
- add rerank to new-api (verified working via /v1/rerank)
- set llm + text-embedding + rerank for aggregator/unknown gateways
* feat(provider): add searchable alias to requester manifests
- add a free-text 'alias' field to every requester manifest spec,
containing the vendor's English/Chinese names, pinyin, common
nicknames and flagship model-series names (e.g. moonshot -> kimi,
月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
alias (substring/contains), so searching 'kimi' surfaces Moonshot,
'硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
LangBot Space gains rerank (coming soon)
* fix(provider): make support_type guard defensive against incomplete model_mgr
- _validate_provider_supports now uses getattr to gracefully skip when
model_mgr / provider_dict / manifest lookup is unavailable, instead of
raising AttributeError (fixes unit tests that mock ap.model_mgr as a
bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
reject unsupported type, allow when support_type missing, allow when
provider unknown, degrade safely when model_mgr is incomplete
* fix(persistence): guard 0004 migration against missing llm_models table
The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.
Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).
* Merge branch 'master' into feat/litellm
Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
(ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
branch added 0004_add_llm_model_context_length, both as children of
0003 (would create multiple heads). Re-chain the litellm migration as
0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
for a single linear head. Update test head assertion accordingly.
* fix(persistence): shorten migration revision id to fit varchar(32)
PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
1734 lines
72 KiB
Python
1734 lines
72 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
import datetime
|
|
import sqlalchemy
|
|
|
|
from ....core import app
|
|
from ....entity.persistence import monitoring as persistence_monitoring
|
|
|
|
|
|
class MonitoringService:
|
|
"""Monitoring service"""
|
|
|
|
ap: app.Application
|
|
|
|
def __init__(self, ap: app.Application) -> None:
|
|
self.ap = ap
|
|
|
|
# ========== Cleanup Methods ==========
|
|
|
|
async def cleanup_expired_records(self, retention_days: int, batch_size: int = 1000) -> dict[str, int]:
|
|
"""Delete monitoring records older than the specified retention period.
|
|
|
|
Args:
|
|
retention_days: Number of days to retain records.
|
|
batch_size: Maximum rows to delete per table batch.
|
|
|
|
Returns:
|
|
A dict mapping table name to the number of deleted rows.
|
|
"""
|
|
if retention_days < 1:
|
|
raise ValueError('retention_days must be >= 1')
|
|
if batch_size < 1:
|
|
raise ValueError('batch_size must be >= 1')
|
|
|
|
cutoff = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(
|
|
days=retention_days
|
|
)
|
|
|
|
tables_and_columns: list[tuple[str, type, sqlalchemy.Column, sqlalchemy.Column]] = [
|
|
(
|
|
'monitoring_messages',
|
|
persistence_monitoring.MonitoringMessage,
|
|
persistence_monitoring.MonitoringMessage.timestamp,
|
|
persistence_monitoring.MonitoringMessage.id,
|
|
),
|
|
(
|
|
'monitoring_llm_calls',
|
|
persistence_monitoring.MonitoringLLMCall,
|
|
persistence_monitoring.MonitoringLLMCall.timestamp,
|
|
persistence_monitoring.MonitoringLLMCall.id,
|
|
),
|
|
(
|
|
'monitoring_embedding_calls',
|
|
persistence_monitoring.MonitoringEmbeddingCall,
|
|
persistence_monitoring.MonitoringEmbeddingCall.timestamp,
|
|
persistence_monitoring.MonitoringEmbeddingCall.id,
|
|
),
|
|
(
|
|
'monitoring_errors',
|
|
persistence_monitoring.MonitoringError,
|
|
persistence_monitoring.MonitoringError.timestamp,
|
|
persistence_monitoring.MonitoringError.id,
|
|
),
|
|
(
|
|
'monitoring_sessions',
|
|
persistence_monitoring.MonitoringSession,
|
|
persistence_monitoring.MonitoringSession.last_activity,
|
|
persistence_monitoring.MonitoringSession.session_id,
|
|
),
|
|
(
|
|
'monitoring_feedback',
|
|
persistence_monitoring.MonitoringFeedback,
|
|
persistence_monitoring.MonitoringFeedback.timestamp,
|
|
persistence_monitoring.MonitoringFeedback.id,
|
|
),
|
|
]
|
|
|
|
deleted_counts: dict[str, int] = {}
|
|
|
|
for table_name, model_cls, ts_column, pk_column in tables_and_columns:
|
|
deleted_counts[table_name] = await self._delete_expired_in_batches(
|
|
model_cls=model_cls,
|
|
ts_column=ts_column,
|
|
pk_column=pk_column,
|
|
cutoff=cutoff,
|
|
batch_size=batch_size,
|
|
)
|
|
|
|
if sum(deleted_counts.values()) > 0:
|
|
await self._release_sqlite_space()
|
|
|
|
return deleted_counts
|
|
|
|
async def _delete_expired_in_batches(
|
|
self,
|
|
model_cls: type,
|
|
ts_column: sqlalchemy.Column,
|
|
pk_column: sqlalchemy.Column,
|
|
cutoff: datetime.datetime,
|
|
batch_size: int,
|
|
) -> int:
|
|
deleted_total = 0
|
|
|
|
while True:
|
|
select_result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(pk_column).where(ts_column < cutoff).limit(batch_size)
|
|
)
|
|
pk_values = list(select_result.scalars().all())
|
|
if not pk_values:
|
|
break
|
|
|
|
delete_result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(model_cls).where(pk_column.in_(pk_values))
|
|
)
|
|
deleted = delete_result.rowcount or 0
|
|
deleted_total += deleted
|
|
|
|
if len(pk_values) < batch_size:
|
|
break
|
|
|
|
return deleted_total
|
|
|
|
async def _release_sqlite_space(self) -> None:
|
|
database_type = self.ap.instance_config.data.get('database', {}).get('use', 'sqlite')
|
|
if database_type != 'sqlite':
|
|
return
|
|
|
|
async with self.ap.persistence_mgr.get_db_engine().connect() as conn:
|
|
autocommit_conn = await conn.execution_options(isolation_level='AUTOCOMMIT')
|
|
await autocommit_conn.execute(sqlalchemy.text('PRAGMA wal_checkpoint(TRUNCATE)'))
|
|
await autocommit_conn.execute(sqlalchemy.text('VACUUM'))
|
|
|
|
# ========== Recording Methods ==========
|
|
|
|
async def record_message(
|
|
self,
|
|
bot_id: str,
|
|
bot_name: str,
|
|
pipeline_id: str,
|
|
pipeline_name: str,
|
|
message_content: str,
|
|
session_id: str,
|
|
status: str = 'success',
|
|
level: str = 'info',
|
|
platform: str | None = None,
|
|
user_id: str | None = None,
|
|
user_name: str | None = None,
|
|
runner_name: str | None = None,
|
|
variables: str | None = None,
|
|
role: str = 'user',
|
|
) -> str:
|
|
"""Record a message"""
|
|
message_id = str(uuid.uuid4())
|
|
message_data = {
|
|
'id': message_id,
|
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'bot_id': bot_id,
|
|
'bot_name': bot_name,
|
|
'pipeline_id': pipeline_id,
|
|
'pipeline_name': pipeline_name,
|
|
'message_content': message_content,
|
|
'session_id': session_id,
|
|
'status': status,
|
|
'level': level,
|
|
'platform': platform,
|
|
'user_id': user_id,
|
|
'user_name': user_name,
|
|
'runner_name': runner_name,
|
|
'variables': variables,
|
|
'role': role,
|
|
}
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_monitoring.MonitoringMessage).values(message_data)
|
|
)
|
|
|
|
return message_id
|
|
|
|
async def record_llm_call(
|
|
self,
|
|
bot_id: str,
|
|
bot_name: str,
|
|
pipeline_id: str,
|
|
pipeline_name: str,
|
|
session_id: str,
|
|
model_name: str,
|
|
input_tokens: int,
|
|
output_tokens: int,
|
|
duration: int,
|
|
status: str = 'success',
|
|
cost: float | None = None,
|
|
error_message: str | None = None,
|
|
message_id: str | None = None,
|
|
) -> str:
|
|
"""Record an LLM call"""
|
|
call_id = str(uuid.uuid4())
|
|
call_data = {
|
|
'id': call_id,
|
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'model_name': model_name,
|
|
'input_tokens': input_tokens,
|
|
'output_tokens': output_tokens,
|
|
'total_tokens': input_tokens + output_tokens,
|
|
'duration': duration,
|
|
'cost': cost,
|
|
'status': status,
|
|
'bot_id': bot_id,
|
|
'bot_name': bot_name,
|
|
'pipeline_id': pipeline_id,
|
|
'pipeline_name': pipeline_name,
|
|
'session_id': session_id,
|
|
'error_message': error_message,
|
|
'message_id': message_id,
|
|
}
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_monitoring.MonitoringLLMCall).values(call_data)
|
|
)
|
|
|
|
return call_id
|
|
|
|
async def record_embedding_call(
|
|
self,
|
|
model_name: str,
|
|
prompt_tokens: int,
|
|
total_tokens: int,
|
|
duration: int,
|
|
input_count: int,
|
|
status: str = 'success',
|
|
error_message: str | None = None,
|
|
knowledge_base_id: str | None = None,
|
|
query_text: str | None = None,
|
|
session_id: str | None = None,
|
|
message_id: str | None = None,
|
|
call_type: str | None = None,
|
|
) -> str:
|
|
"""Record an embedding call"""
|
|
call_id = str(uuid.uuid4())
|
|
call_data = {
|
|
'id': call_id,
|
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'model_name': model_name,
|
|
'prompt_tokens': prompt_tokens,
|
|
'total_tokens': total_tokens,
|
|
'duration': duration,
|
|
'input_count': input_count,
|
|
'status': status,
|
|
'error_message': error_message,
|
|
'knowledge_base_id': knowledge_base_id,
|
|
'query_text': query_text,
|
|
'session_id': session_id,
|
|
'message_id': message_id,
|
|
'call_type': call_type,
|
|
}
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_monitoring.MonitoringEmbeddingCall).values(call_data)
|
|
)
|
|
|
|
return call_id
|
|
|
|
async def record_session_start(
|
|
self,
|
|
session_id: str,
|
|
bot_id: str,
|
|
bot_name: str,
|
|
pipeline_id: str,
|
|
pipeline_name: str,
|
|
platform: str | None = None,
|
|
user_id: str | None = None,
|
|
user_name: str | None = None,
|
|
) -> None:
|
|
"""Record a new session"""
|
|
session_data = {
|
|
'session_id': session_id,
|
|
'bot_id': bot_id,
|
|
'bot_name': bot_name,
|
|
'pipeline_id': pipeline_id,
|
|
'pipeline_name': pipeline_name,
|
|
'message_count': 0,
|
|
'start_time': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'is_active': True,
|
|
'platform': platform,
|
|
'user_id': user_id,
|
|
'user_name': user_name,
|
|
}
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_monitoring.MonitoringSession).values(session_data)
|
|
)
|
|
|
|
async def update_session_activity(
|
|
self,
|
|
session_id: str,
|
|
pipeline_id: str | None = None,
|
|
pipeline_name: str | None = None,
|
|
) -> bool:
|
|
"""Update session last activity time and increment message count.
|
|
|
|
Also updates pipeline info if the bot's pipeline has changed.
|
|
|
|
Returns:
|
|
True if session was found and updated, False if session doesn't exist.
|
|
"""
|
|
update_values = {
|
|
'last_activity': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'message_count': persistence_monitoring.MonitoringSession.message_count + 1,
|
|
}
|
|
|
|
# Update pipeline info if provided (handles pipeline switch)
|
|
if pipeline_id is not None:
|
|
update_values['pipeline_id'] = pipeline_id
|
|
if pipeline_name is not None:
|
|
update_values['pipeline_name'] = pipeline_name
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(persistence_monitoring.MonitoringSession)
|
|
.where(persistence_monitoring.MonitoringSession.session_id == session_id)
|
|
.values(update_values)
|
|
)
|
|
# Check if any rows were updated
|
|
return result.rowcount > 0
|
|
|
|
async def record_error(
|
|
self,
|
|
bot_id: str,
|
|
bot_name: str,
|
|
pipeline_id: str,
|
|
pipeline_name: str,
|
|
error_type: str,
|
|
error_message: str,
|
|
session_id: str | None = None,
|
|
stack_trace: str | None = None,
|
|
message_id: str | None = None,
|
|
) -> str:
|
|
"""Record an error"""
|
|
error_id = str(uuid.uuid4())
|
|
error_data = {
|
|
'id': error_id,
|
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
|
|
'error_type': error_type,
|
|
'error_message': error_message,
|
|
'bot_id': bot_id,
|
|
'bot_name': bot_name,
|
|
'pipeline_id': pipeline_id,
|
|
'pipeline_name': pipeline_name,
|
|
'session_id': session_id,
|
|
'stack_trace': stack_trace,
|
|
'message_id': message_id,
|
|
}
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_monitoring.MonitoringError).values(error_data)
|
|
)
|
|
|
|
return error_id
|
|
|
|
async def update_message_status(
|
|
self,
|
|
message_id: str,
|
|
status: str,
|
|
level: str | None = None,
|
|
variables: str | None = None,
|
|
) -> None:
|
|
"""Update message status and optionally variables"""
|
|
update_values = {'status': status}
|
|
if level is not None:
|
|
update_values['level'] = level
|
|
if variables is not None:
|
|
update_values['variables'] = variables
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(persistence_monitoring.MonitoringMessage)
|
|
.where(persistence_monitoring.MonitoringMessage.id == message_id)
|
|
.values(update_values)
|
|
)
|
|
|
|
# ========== Query Methods ==========
|
|
|
|
async def get_overview_metrics(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
) -> dict:
|
|
"""Get overview metrics"""
|
|
# Build base query conditions
|
|
message_conditions = []
|
|
llm_conditions = []
|
|
embedding_conditions = []
|
|
session_conditions = []
|
|
|
|
if bot_ids:
|
|
message_conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
|
|
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
|
|
session_conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
|
|
|
|
if pipeline_ids:
|
|
message_conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
|
|
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
|
|
session_conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
|
|
|
|
if start_time:
|
|
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
|
|
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
|
|
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
|
|
session_conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
|
|
|
|
if end_time:
|
|
message_conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
|
|
llm_conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
|
|
embedding_conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
|
|
session_conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
|
|
|
|
# Total messages
|
|
message_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
|
|
if message_conditions:
|
|
message_query = message_query.where(sqlalchemy.and_(*message_conditions))
|
|
|
|
total_messages_result = await self.ap.persistence_mgr.execute_async(message_query)
|
|
total_messages = total_messages_result.scalar() or 0
|
|
|
|
# Total LLM calls
|
|
llm_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
|
|
if llm_conditions:
|
|
llm_query = llm_query.where(sqlalchemy.and_(*llm_conditions))
|
|
|
|
llm_calls_result = await self.ap.persistence_mgr.execute_async(llm_query)
|
|
llm_calls = llm_calls_result.scalar() or 0
|
|
|
|
# Total Embedding calls
|
|
embedding_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
|
|
if embedding_conditions:
|
|
embedding_query = embedding_query.where(sqlalchemy.and_(*embedding_conditions))
|
|
|
|
embedding_calls_result = await self.ap.persistence_mgr.execute_async(embedding_query)
|
|
embedding_calls = embedding_calls_result.scalar() or 0
|
|
|
|
# Total model calls (LLM + Embedding)
|
|
model_calls = llm_calls + embedding_calls
|
|
|
|
# Success rate (based on messages)
|
|
success_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id)).where(
|
|
persistence_monitoring.MonitoringMessage.status == 'success'
|
|
)
|
|
if message_conditions:
|
|
success_query = success_query.where(sqlalchemy.and_(*message_conditions))
|
|
|
|
success_result = await self.ap.persistence_mgr.execute_async(success_query)
|
|
success_count = success_result.scalar() or 0
|
|
success_rate = (success_count / total_messages * 100) if total_messages > 0 else 100
|
|
|
|
# Active sessions
|
|
active_session_query = sqlalchemy.select(
|
|
sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id)
|
|
).where(persistence_monitoring.MonitoringSession.is_active == True)
|
|
if session_conditions:
|
|
active_session_query = active_session_query.where(sqlalchemy.and_(*session_conditions))
|
|
|
|
active_sessions_result = await self.ap.persistence_mgr.execute_async(active_session_query)
|
|
active_sessions = active_sessions_result.scalar() or 0
|
|
|
|
return {
|
|
'total_messages': total_messages,
|
|
'llm_calls': llm_calls,
|
|
'embedding_calls': embedding_calls,
|
|
'model_calls': model_calls,
|
|
'success_rate': round(success_rate, 2),
|
|
'active_sessions': active_sessions,
|
|
}
|
|
|
|
async def get_token_statistics(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
bucket: str = 'hour',
|
|
) -> dict:
|
|
"""Get detailed token usage statistics for production observability.
|
|
|
|
Returns:
|
|
- summary: aggregate token counters and call/latency stats over the window
|
|
- by_model: per-model token + call breakdown (sorted by total tokens desc)
|
|
- timeseries: token usage bucketed by `bucket` ('hour' or 'day')
|
|
|
|
Only successful LLM calls are counted toward token totals; error calls are
|
|
reported separately so a spike in failures is visible without polluting
|
|
token accounting.
|
|
"""
|
|
LLMCall = persistence_monitoring.MonitoringLLMCall
|
|
|
|
conditions = []
|
|
if bot_ids:
|
|
conditions.append(LLMCall.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(LLMCall.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(LLMCall.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(LLMCall.timestamp <= end_time)
|
|
|
|
def _apply(query):
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
return query
|
|
|
|
# ---- Summary aggregates ----
|
|
summary_query = _apply(
|
|
sqlalchemy.select(
|
|
sqlalchemy.func.count(LLMCall.id),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
|
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'success', 1), else_=0)),
|
|
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
|
|
# Count of successful calls that nonetheless recorded zero tokens —
|
|
# a data-quality signal that usage reporting may be broken upstream.
|
|
sqlalchemy.func.sum(
|
|
sqlalchemy.case(
|
|
(sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1),
|
|
else_=0,
|
|
)
|
|
),
|
|
)
|
|
)
|
|
summary_result = await self.ap.persistence_mgr.execute_async(summary_query)
|
|
row = summary_result.first()
|
|
(
|
|
total_calls,
|
|
total_input_tokens,
|
|
total_output_tokens,
|
|
total_tokens,
|
|
total_duration,
|
|
total_cost,
|
|
success_calls,
|
|
error_calls,
|
|
zero_token_success_calls,
|
|
) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0)
|
|
|
|
total_calls = total_calls or 0
|
|
success_calls = success_calls or 0
|
|
error_calls = error_calls or 0
|
|
zero_token_success_calls = zero_token_success_calls or 0
|
|
|
|
summary = {
|
|
'total_calls': total_calls,
|
|
'success_calls': success_calls,
|
|
'error_calls': error_calls,
|
|
'total_input_tokens': int(total_input_tokens or 0),
|
|
'total_output_tokens': int(total_output_tokens or 0),
|
|
'total_tokens': int(total_tokens or 0),
|
|
'total_cost': round(float(total_cost or 0.0), 6),
|
|
'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0,
|
|
'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0,
|
|
'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2)
|
|
if total_duration and total_duration > 0
|
|
else 0,
|
|
'zero_token_success_calls': zero_token_success_calls,
|
|
}
|
|
|
|
# ---- Per-model breakdown ----
|
|
by_model_query = _apply(
|
|
sqlalchemy.select(
|
|
LLMCall.model_name,
|
|
sqlalchemy.func.count(LLMCall.id),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
|
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
|
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
|
|
).group_by(LLMCall.model_name)
|
|
)
|
|
by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query)
|
|
by_model = []
|
|
for mrow in by_model_result.all():
|
|
(
|
|
model_name,
|
|
m_calls,
|
|
m_in,
|
|
m_out,
|
|
m_total,
|
|
m_duration,
|
|
m_cost,
|
|
m_errors,
|
|
) = mrow
|
|
m_calls = m_calls or 0
|
|
by_model.append(
|
|
{
|
|
'model_name': model_name,
|
|
'calls': m_calls,
|
|
'error_calls': m_errors or 0,
|
|
'input_tokens': int(m_in or 0),
|
|
'output_tokens': int(m_out or 0),
|
|
'total_tokens': int(m_total or 0),
|
|
'cost': round(float(m_cost or 0.0), 6),
|
|
'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0,
|
|
'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0,
|
|
}
|
|
)
|
|
by_model.sort(key=lambda x: x['total_tokens'], reverse=True)
|
|
|
|
# ---- Time-bucketed series ----
|
|
# Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and
|
|
# aggregate in Python. The window is bounded by the time filter, so this is
|
|
# cheap for typical dashboard ranges (hours/days).
|
|
series_query = _apply(
|
|
sqlalchemy.select(
|
|
LLMCall.timestamp,
|
|
LLMCall.input_tokens,
|
|
LLMCall.output_tokens,
|
|
LLMCall.total_tokens,
|
|
).order_by(LLMCall.timestamp.asc())
|
|
)
|
|
series_result = await self.ap.persistence_mgr.execute_async(series_query)
|
|
|
|
bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d'
|
|
buckets: dict[str, dict] = {}
|
|
for srow in series_result.all():
|
|
ts, s_in, s_out, s_total = srow
|
|
if ts is None:
|
|
continue
|
|
key = ts.strftime(bucket_fmt)
|
|
b = buckets.setdefault(
|
|
key,
|
|
{'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0},
|
|
)
|
|
b['input_tokens'] += int(s_in or 0)
|
|
b['output_tokens'] += int(s_out or 0)
|
|
b['total_tokens'] += int(s_total or 0)
|
|
b['calls'] += 1
|
|
|
|
timeseries = [buckets[k] for k in sorted(buckets.keys())]
|
|
|
|
return {
|
|
'summary': summary,
|
|
'by_model': by_model,
|
|
'timeseries': timeseries,
|
|
'bucket': bucket,
|
|
}
|
|
|
|
async def get_messages(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
session_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get messages with filters"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
|
|
if session_ids:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.session_id.in_(session_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringMessage.id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get messages
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).order_by(
|
|
persistence_monitoring.MonitoringMessage.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
messages_rows = result.all()
|
|
|
|
serialized = []
|
|
for row in messages_rows:
|
|
# Extract model instance from Row (SQLAlchemy returns Row objects)
|
|
msg = row[0] if isinstance(row, tuple) else row
|
|
serialized_msg = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, msg)
|
|
serialized.append(serialized_msg)
|
|
|
|
return (serialized, total)
|
|
|
|
async def get_llm_calls(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get LLM calls with filters"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringLLMCall.id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get LLM calls
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).order_by(
|
|
persistence_monitoring.MonitoringLLMCall.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
llm_calls_rows = result.all()
|
|
|
|
return (
|
|
[
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in llm_calls_rows
|
|
],
|
|
total,
|
|
)
|
|
|
|
async def get_embedding_calls(
|
|
self,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
knowledge_base_id: str | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get embedding calls with filters"""
|
|
conditions = []
|
|
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
|
|
if knowledge_base_id:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.knowledge_base_id == knowledge_base_id)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringEmbeddingCall.id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get embedding calls
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringEmbeddingCall).order_by(
|
|
persistence_monitoring.MonitoringEmbeddingCall.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
embedding_calls_rows = result.all()
|
|
|
|
return (
|
|
[
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringEmbeddingCall, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in embedding_calls_rows
|
|
],
|
|
total,
|
|
)
|
|
|
|
async def get_sessions(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
is_active: bool | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get sessions with filters"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
|
|
if is_active is not None:
|
|
conditions.append(persistence_monitoring.MonitoringSession.is_active == is_active)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringSession.session_id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get sessions
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringSession).order_by(
|
|
persistence_monitoring.MonitoringSession.last_activity.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
sessions_rows = result.all()
|
|
|
|
return (
|
|
[
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringSession, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in sessions_rows
|
|
],
|
|
total,
|
|
)
|
|
|
|
async def get_errors(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get errors with filters"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringError.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringError.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringError.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringError.timestamp <= end_time)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringError.id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get errors
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringError).order_by(
|
|
persistence_monitoring.MonitoringError.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
errors_rows = result.all()
|
|
|
|
return (
|
|
[
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in errors_rows
|
|
],
|
|
total,
|
|
)
|
|
|
|
async def get_session_analysis(
|
|
self,
|
|
session_id: str,
|
|
) -> dict:
|
|
"""Get detailed analysis for a specific session"""
|
|
# Get session info
|
|
session_query = sqlalchemy.select(persistence_monitoring.MonitoringSession).where(
|
|
persistence_monitoring.MonitoringSession.session_id == session_id
|
|
)
|
|
session_result = await self.ap.persistence_mgr.execute_async(session_query)
|
|
session_row = session_result.first()
|
|
|
|
if not session_row:
|
|
return {
|
|
'session_id': session_id,
|
|
'found': False,
|
|
}
|
|
|
|
session = session_row[0] if isinstance(session_row, tuple) else session_row
|
|
|
|
# Get messages for this session
|
|
messages_query = (
|
|
sqlalchemy.select(persistence_monitoring.MonitoringMessage)
|
|
.where(persistence_monitoring.MonitoringMessage.session_id == session_id)
|
|
.order_by(persistence_monitoring.MonitoringMessage.timestamp.asc())
|
|
)
|
|
messages_result = await self.ap.persistence_mgr.execute_async(messages_query)
|
|
messages_rows = messages_result.all()
|
|
|
|
# Count messages by status
|
|
success_messages = 0
|
|
error_messages = 0
|
|
pending_messages = 0
|
|
for row in messages_rows:
|
|
msg = row[0] if isinstance(row, tuple) else row
|
|
if msg.status == 'success':
|
|
success_messages += 1
|
|
elif msg.status == 'error':
|
|
error_messages += 1
|
|
elif msg.status == 'pending':
|
|
pending_messages += 1
|
|
|
|
# Get LLM calls for this session
|
|
llm_query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).where(
|
|
persistence_monitoring.MonitoringLLMCall.session_id == session_id
|
|
)
|
|
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
|
|
llm_rows = llm_result.all()
|
|
|
|
# Calculate LLM statistics
|
|
total_llm_calls = len(llm_rows)
|
|
total_input_tokens = 0
|
|
total_output_tokens = 0
|
|
total_tokens = 0
|
|
total_duration = 0
|
|
success_llm_calls = 0
|
|
error_llm_calls = 0
|
|
|
|
for row in llm_rows:
|
|
llm_call = row[0] if isinstance(row, tuple) else row
|
|
total_input_tokens += llm_call.input_tokens
|
|
total_output_tokens += llm_call.output_tokens
|
|
total_tokens += llm_call.total_tokens
|
|
total_duration += llm_call.duration
|
|
if llm_call.status == 'success':
|
|
success_llm_calls += 1
|
|
else:
|
|
error_llm_calls += 1
|
|
|
|
# Get errors for this session
|
|
error_query = (
|
|
sqlalchemy.select(persistence_monitoring.MonitoringError)
|
|
.where(persistence_monitoring.MonitoringError.session_id == session_id)
|
|
.order_by(persistence_monitoring.MonitoringError.timestamp.desc())
|
|
)
|
|
error_result = await self.ap.persistence_mgr.execute_async(error_query)
|
|
error_rows = error_result.all()
|
|
|
|
errors = [
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in error_rows
|
|
]
|
|
|
|
# Calculate session duration
|
|
if messages_rows:
|
|
first_msg = messages_rows[0][0] if isinstance(messages_rows[0], tuple) else messages_rows[0]
|
|
last_msg = messages_rows[-1][0] if isinstance(messages_rows[-1], tuple) else messages_rows[-1]
|
|
session_duration_seconds = int((last_msg.timestamp - first_msg.timestamp).total_seconds())
|
|
else:
|
|
session_duration_seconds = 0
|
|
|
|
return {
|
|
'session_id': session_id,
|
|
'found': True,
|
|
'session': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSession, session),
|
|
'message_stats': {
|
|
'total': len(messages_rows),
|
|
'success': success_messages,
|
|
'error': error_messages,
|
|
'pending': pending_messages,
|
|
},
|
|
'llm_stats': {
|
|
'total_calls': total_llm_calls,
|
|
'success_calls': success_llm_calls,
|
|
'error_calls': error_llm_calls,
|
|
'total_input_tokens': total_input_tokens,
|
|
'total_output_tokens': total_output_tokens,
|
|
'total_tokens': total_tokens,
|
|
'average_duration_ms': int(total_duration / total_llm_calls) if total_llm_calls > 0 else 0,
|
|
},
|
|
'errors': errors,
|
|
'session_duration_seconds': session_duration_seconds,
|
|
}
|
|
|
|
async def get_message_details(
|
|
self,
|
|
message_id: str,
|
|
) -> dict:
|
|
"""Get detailed information for a specific message including associated LLM calls and errors"""
|
|
# Get message info
|
|
message_query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).where(
|
|
persistence_monitoring.MonitoringMessage.id == message_id
|
|
)
|
|
message_result = await self.ap.persistence_mgr.execute_async(message_query)
|
|
message_row = message_result.first()
|
|
|
|
if not message_row:
|
|
return {
|
|
'message_id': message_id,
|
|
'found': False,
|
|
}
|
|
|
|
message = message_row[0] if isinstance(message_row, tuple) else message_row
|
|
|
|
# Get LLM calls for this message
|
|
llm_query = (
|
|
sqlalchemy.select(persistence_monitoring.MonitoringLLMCall)
|
|
.where(persistence_monitoring.MonitoringLLMCall.message_id == message_id)
|
|
.order_by(persistence_monitoring.MonitoringLLMCall.timestamp.asc())
|
|
)
|
|
llm_result = await self.ap.persistence_mgr.execute_async(llm_query)
|
|
llm_rows = llm_result.all()
|
|
|
|
llm_calls = [
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringLLMCall, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in llm_rows
|
|
]
|
|
|
|
# Calculate LLM statistics
|
|
total_input_tokens = sum(call.input_tokens for call in llm_rows)
|
|
total_output_tokens = sum(call.output_tokens for call in llm_rows)
|
|
total_tokens = sum(call.total_tokens for call in llm_rows)
|
|
total_duration = sum(call.duration for call in llm_rows)
|
|
|
|
# Get errors for this message
|
|
error_query = (
|
|
sqlalchemy.select(persistence_monitoring.MonitoringError)
|
|
.where(persistence_monitoring.MonitoringError.message_id == message_id)
|
|
.order_by(persistence_monitoring.MonitoringError.timestamp.asc())
|
|
)
|
|
error_result = await self.ap.persistence_mgr.execute_async(error_query)
|
|
error_rows = error_result.all()
|
|
|
|
errors = [
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringError, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in error_rows
|
|
]
|
|
|
|
return {
|
|
'message_id': message_id,
|
|
'found': True,
|
|
'message': self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, message),
|
|
'llm_calls': llm_calls,
|
|
'llm_stats': {
|
|
'total_calls': len(llm_rows),
|
|
'total_input_tokens': total_input_tokens,
|
|
'total_output_tokens': total_output_tokens,
|
|
'total_tokens': total_tokens,
|
|
'total_duration_ms': total_duration,
|
|
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
|
|
},
|
|
'errors': errors,
|
|
}
|
|
|
|
# ========== Export Methods ==========
|
|
|
|
def _escape_csv_field(self, field: str | None) -> str:
|
|
"""Escape a field for CSV output"""
|
|
if field is None:
|
|
return ''
|
|
# Convert non-string types to string first
|
|
if not isinstance(field, str):
|
|
field = str(field)
|
|
# Replace common escape sequences
|
|
field = field.replace('\r\n', '\n').replace('\r', '\n')
|
|
# If field contains comma, double quote, or newline, wrap in quotes
|
|
if ',' in field or '"' in field or '\n' in field:
|
|
# Escape double quotes by doubling them
|
|
field = '"' + field.replace('"', '""') + '"'
|
|
return field
|
|
|
|
def _format_timestamp(self, dt: datetime.datetime) -> str:
|
|
"""Format datetime to ISO format string"""
|
|
return dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
def _extract_message_text(self, message_content: str) -> str:
|
|
"""Extract plain text from message chain JSON"""
|
|
if not message_content:
|
|
return ''
|
|
|
|
try:
|
|
import json
|
|
|
|
message_chain = json.loads(message_content)
|
|
if not isinstance(message_chain, list):
|
|
return message_content
|
|
|
|
text_parts = []
|
|
for component in message_chain:
|
|
if not isinstance(component, dict):
|
|
continue
|
|
component_type = component.get('type')
|
|
if component_type == 'Plain':
|
|
text = component.get('text', '')
|
|
text_parts.append(text)
|
|
elif component_type == 'At':
|
|
display = component.get('display', '')
|
|
target = component.get('target', '')
|
|
if display:
|
|
text_parts.append(f'@{display}')
|
|
elif target:
|
|
text_parts.append(f'@{target}')
|
|
elif component_type == 'AtAll':
|
|
text_parts.append('@All')
|
|
elif component_type == 'Image':
|
|
text_parts.append('[Image]')
|
|
elif component_type == 'File':
|
|
name = component.get('name', 'File')
|
|
text_parts.append(f'[File: {name}]')
|
|
elif component_type == 'Voice':
|
|
length = component.get('length', 0)
|
|
text_parts.append(f'[Voice {length}s]')
|
|
elif component_type == 'Quote':
|
|
# Quote content is in 'origin' field
|
|
origin = component.get('origin', [])
|
|
if isinstance(origin, list):
|
|
for item in origin:
|
|
if isinstance(item, dict) and item.get('type') == 'Plain':
|
|
text_parts.append(f'> {item.get("text", "")}')
|
|
elif component_type == 'Source':
|
|
# Skip Source component
|
|
continue
|
|
else:
|
|
# Other unknown types
|
|
text_parts.append(f'[{component_type}]')
|
|
|
|
return ''.join(text_parts)
|
|
except (json.JSONDecodeError, TypeError, KeyError):
|
|
# If not valid JSON, return as-is
|
|
return message_content
|
|
|
|
async def export_messages(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export messages as list of dictionaries for CSV conversion"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringMessage.timestamp <= end_time)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringMessage).order_by(
|
|
persistence_monitoring.MonitoringMessage.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'id': row[0].id if isinstance(row, tuple) else row.id,
|
|
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
|
|
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
|
|
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
|
|
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
|
|
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
|
|
'runner_name': row[0].runner_name if isinstance(row, tuple) else row.runner_name,
|
|
'message_content': row[0].message_content if isinstance(row, tuple) else row.message_content,
|
|
'message_text': self._extract_message_text(
|
|
row[0].message_content if isinstance(row, tuple) else row.message_content
|
|
),
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'status': row[0].status if isinstance(row, tuple) else row.status,
|
|
'level': row[0].level if isinstance(row, tuple) else row.level,
|
|
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
|
|
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
async def export_llm_calls(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export LLM calls as list of dictionaries for CSV conversion"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringLLMCall.timestamp <= end_time)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringLLMCall).order_by(
|
|
persistence_monitoring.MonitoringLLMCall.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'id': row[0].id if isinstance(row, tuple) else row.id,
|
|
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
|
|
'model_name': row[0].model_name if isinstance(row, tuple) else row.model_name,
|
|
'input_tokens': row[0].input_tokens if isinstance(row, tuple) else row.input_tokens,
|
|
'output_tokens': row[0].output_tokens if isinstance(row, tuple) else row.output_tokens,
|
|
'total_tokens': row[0].total_tokens if isinstance(row, tuple) else row.total_tokens,
|
|
'duration_ms': row[0].duration if isinstance(row, tuple) else row.duration,
|
|
'cost': row[0].cost if isinstance(row, tuple) else row.cost,
|
|
'status': row[0].status if isinstance(row, tuple) else row.status,
|
|
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
|
|
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
|
|
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
|
|
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
|
|
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
async def export_embedding_calls(
|
|
self,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
knowledge_base_id: str | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export embedding calls as list of dictionaries for CSV conversion"""
|
|
conditions = []
|
|
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.timestamp <= end_time)
|
|
if knowledge_base_id:
|
|
conditions.append(persistence_monitoring.MonitoringEmbeddingCall.knowledge_base_id == knowledge_base_id)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringEmbeddingCall).order_by(
|
|
persistence_monitoring.MonitoringEmbeddingCall.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'id': row[0].id if isinstance(row, tuple) else row.id,
|
|
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
|
|
'model_name': row[0].model_name if isinstance(row, tuple) else row.model_name,
|
|
'prompt_tokens': row[0].prompt_tokens if isinstance(row, tuple) else row.prompt_tokens,
|
|
'total_tokens': row[0].total_tokens if isinstance(row, tuple) else row.total_tokens,
|
|
'duration_ms': row[0].duration if isinstance(row, tuple) else row.duration,
|
|
'input_count': row[0].input_count if isinstance(row, tuple) else row.input_count,
|
|
'status': row[0].status if isinstance(row, tuple) else row.status,
|
|
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
|
|
'knowledge_base_id': row[0].knowledge_base_id if isinstance(row, tuple) else row.knowledge_base_id,
|
|
'query_text': row[0].query_text if isinstance(row, tuple) else row.query_text,
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
|
|
'call_type': row[0].call_type if isinstance(row, tuple) else row.call_type,
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
async def export_errors(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export errors as list of dictionaries for CSV conversion"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringError.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringError.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringError.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringError.timestamp <= end_time)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringError).order_by(
|
|
persistence_monitoring.MonitoringError.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'id': row[0].id if isinstance(row, tuple) else row.id,
|
|
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
|
|
'error_type': row[0].error_type if isinstance(row, tuple) else row.error_type,
|
|
'error_message': row[0].error_message if isinstance(row, tuple) else row.error_message,
|
|
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
|
|
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
|
|
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
|
|
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
|
|
'stack_trace': row[0].stack_trace if isinstance(row, tuple) else row.stack_trace,
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
async def export_sessions(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export sessions as list of dictionaries for CSV conversion"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringSession.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringSession.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringSession.start_time >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringSession.start_time <= end_time)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringSession).order_by(
|
|
persistence_monitoring.MonitoringSession.last_activity.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
|
|
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
|
|
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
|
|
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
|
|
'message_count': row[0].message_count if isinstance(row, tuple) else row.message_count,
|
|
'start_time': self._format_timestamp(row[0].start_time if isinstance(row, tuple) else row.start_time),
|
|
'last_activity': self._format_timestamp(
|
|
row[0].last_activity if isinstance(row, tuple) else row.last_activity
|
|
),
|
|
'is_active': str(row[0].is_active if isinstance(row, tuple) else row.is_active),
|
|
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
|
|
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
# ========== Feedback Methods ==========
|
|
|
|
async def record_feedback(
|
|
self,
|
|
feedback_id: str,
|
|
feedback_type: int,
|
|
feedback_content: str | None = None,
|
|
inaccurate_reasons: list[str] | None = None,
|
|
bot_id: str | None = None,
|
|
bot_name: str | None = None,
|
|
pipeline_id: str | None = None,
|
|
pipeline_name: str | None = None,
|
|
session_id: str | None = None,
|
|
message_id: str | None = None,
|
|
stream_id: str | None = None,
|
|
user_id: str | None = None,
|
|
platform: str | None = None,
|
|
) -> str:
|
|
"""Record user feedback (like/dislike) from AI Bot conversation.
|
|
|
|
Args:
|
|
feedback_id: Unique feedback identifier from platform (e.g., WeChat Work)
|
|
feedback_type: 1 = like (thumbs up), 2 = dislike (thumbs down)
|
|
feedback_content: Optional user feedback text
|
|
inaccurate_reasons: List of reasons for inaccurate response (for dislike)
|
|
bot_id: Bot ID
|
|
bot_name: Bot name
|
|
pipeline_id: Pipeline ID
|
|
pipeline_name: Pipeline name
|
|
session_id: Session ID
|
|
message_id: Message ID
|
|
stream_id: Stream ID (for WeChat Work streaming messages)
|
|
user_id: User ID
|
|
platform: Platform name (e.g., 'wecom')
|
|
|
|
Returns:
|
|
The record ID
|
|
"""
|
|
import json
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
|
reasons_json = json.dumps(inaccurate_reasons, ensure_ascii=False) if inaccurate_reasons else None
|
|
|
|
MonitoringFeedback = persistence_monitoring.MonitoringFeedback
|
|
|
|
# Handle cancel feedback (type=3): delete existing record
|
|
if feedback_type == 3:
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(MonitoringFeedback).where(MonitoringFeedback.feedback_id == feedback_id)
|
|
)
|
|
return None
|
|
|
|
# Check if record with this feedback_id already exists
|
|
existing_result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(MonitoringFeedback).where(MonitoringFeedback.feedback_id == feedback_id)
|
|
)
|
|
existing_row = existing_result.first()
|
|
|
|
if existing_row:
|
|
# UPDATE existing record
|
|
existing = existing_row[0] if isinstance(existing_row, tuple) else existing_row
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(MonitoringFeedback)
|
|
.where(MonitoringFeedback.feedback_id == feedback_id)
|
|
.values(
|
|
timestamp=now,
|
|
feedback_type=feedback_type,
|
|
feedback_content=feedback_content,
|
|
inaccurate_reasons=reasons_json,
|
|
bot_id=bot_id or existing.bot_id,
|
|
bot_name=bot_name or existing.bot_name,
|
|
pipeline_id=pipeline_id or existing.pipeline_id,
|
|
pipeline_name=pipeline_name or existing.pipeline_name,
|
|
session_id=session_id or existing.session_id,
|
|
message_id=message_id or existing.message_id,
|
|
stream_id=stream_id or existing.stream_id,
|
|
user_id=user_id or existing.user_id,
|
|
platform=platform or existing.platform,
|
|
)
|
|
)
|
|
return existing.id
|
|
else:
|
|
# INSERT new record with IntegrityError defense
|
|
record_id = str(uuid.uuid4())
|
|
record_data = {
|
|
'id': record_id,
|
|
'timestamp': now,
|
|
'feedback_id': feedback_id,
|
|
'feedback_type': feedback_type,
|
|
'feedback_content': feedback_content,
|
|
'inaccurate_reasons': reasons_json,
|
|
'bot_id': bot_id,
|
|
'bot_name': bot_name,
|
|
'pipeline_id': pipeline_id,
|
|
'pipeline_name': pipeline_name,
|
|
'session_id': session_id,
|
|
'message_id': message_id,
|
|
'stream_id': stream_id,
|
|
'user_id': user_id,
|
|
'platform': platform,
|
|
}
|
|
try:
|
|
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(MonitoringFeedback).values(record_data))
|
|
return record_id
|
|
except Exception:
|
|
# UNIQUE constraint conflict (concurrent feedback for same feedback_id)
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(MonitoringFeedback)
|
|
.where(MonitoringFeedback.feedback_id == feedback_id)
|
|
.values(
|
|
timestamp=now,
|
|
feedback_type=feedback_type,
|
|
feedback_content=feedback_content,
|
|
inaccurate_reasons=reasons_json,
|
|
)
|
|
)
|
|
return feedback_id
|
|
|
|
async def get_feedback_stats(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
) -> dict:
|
|
"""Get feedback statistics.
|
|
|
|
Returns:
|
|
Dictionary with total likes, dislikes, and breakdown by bot/pipeline
|
|
"""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
|
|
|
|
# Get total likes (feedback_type = 1)
|
|
likes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where(
|
|
persistence_monitoring.MonitoringFeedback.feedback_type == 1
|
|
)
|
|
if conditions:
|
|
likes_query = likes_query.where(sqlalchemy.and_(*conditions))
|
|
likes_result = await self.ap.persistence_mgr.execute_async(likes_query)
|
|
total_likes = likes_result.scalar() or 0
|
|
|
|
# Get total dislikes (feedback_type = 2)
|
|
dislikes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where(
|
|
persistence_monitoring.MonitoringFeedback.feedback_type == 2
|
|
)
|
|
if conditions:
|
|
dislikes_query = dislikes_query.where(sqlalchemy.and_(*conditions))
|
|
dislikes_result = await self.ap.persistence_mgr.execute_async(dislikes_query)
|
|
total_dislikes = dislikes_result.scalar() or 0
|
|
|
|
# Get total feedback count
|
|
total_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id))
|
|
if conditions:
|
|
total_query = total_query.where(sqlalchemy.and_(*conditions))
|
|
total_result = await self.ap.persistence_mgr.execute_async(total_query)
|
|
total_feedback = total_result.scalar() or 0
|
|
|
|
# Calculate satisfaction rate
|
|
satisfaction_rate = (total_likes / total_feedback * 100) if total_feedback > 0 else 0
|
|
|
|
# Get feedback by bot
|
|
bot_stats_query = sqlalchemy.select(
|
|
persistence_monitoring.MonitoringFeedback.bot_id,
|
|
persistence_monitoring.MonitoringFeedback.bot_name,
|
|
sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id).label('total'),
|
|
sqlalchemy.func.sum(
|
|
sqlalchemy.case((persistence_monitoring.MonitoringFeedback.feedback_type == 1, 1), else_=0)
|
|
).label('likes'),
|
|
sqlalchemy.func.sum(
|
|
sqlalchemy.case((persistence_monitoring.MonitoringFeedback.feedback_type == 2, 1), else_=0)
|
|
).label('dislikes'),
|
|
).group_by(
|
|
persistence_monitoring.MonitoringFeedback.bot_id,
|
|
persistence_monitoring.MonitoringFeedback.bot_name,
|
|
)
|
|
if conditions:
|
|
bot_stats_query = bot_stats_query.where(sqlalchemy.and_(*conditions))
|
|
bot_stats_result = await self.ap.persistence_mgr.execute_async(bot_stats_query)
|
|
bot_stats = [
|
|
{
|
|
'bot_id': row.bot_id,
|
|
'bot_name': row.bot_name,
|
|
'total': row.total,
|
|
'likes': row.likes or 0,
|
|
'dislikes': row.dislikes or 0,
|
|
}
|
|
for row in bot_stats_result.all()
|
|
]
|
|
|
|
return {
|
|
'total_feedback': total_feedback,
|
|
'total_likes': total_likes,
|
|
'total_dislikes': total_dislikes,
|
|
'satisfaction_rate': round(satisfaction_rate, 2),
|
|
'by_bot': bot_stats,
|
|
}
|
|
|
|
async def get_feedback_list(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
feedback_type: int | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
"""Get feedback list with filters."""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
|
|
if feedback_type is not None:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.feedback_type == feedback_type)
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
|
|
|
|
# Get total count
|
|
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id))
|
|
if conditions:
|
|
count_query = count_query.where(sqlalchemy.and_(*conditions))
|
|
count_result = await self.ap.persistence_mgr.execute_async(count_query)
|
|
total = count_result.scalar() or 0
|
|
|
|
# Get feedback list
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by(
|
|
persistence_monitoring.MonitoringFeedback.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
query = query.limit(limit).offset(offset)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return (
|
|
[
|
|
self.ap.persistence_mgr.serialize_model(
|
|
persistence_monitoring.MonitoringFeedback, row[0] if isinstance(row, tuple) else row
|
|
)
|
|
for row in rows
|
|
],
|
|
total,
|
|
)
|
|
|
|
async def export_feedback(
|
|
self,
|
|
bot_ids: list[str] | None = None,
|
|
pipeline_ids: list[str] | None = None,
|
|
start_time: datetime.datetime | None = None,
|
|
end_time: datetime.datetime | None = None,
|
|
limit: int = 100000,
|
|
) -> list[dict]:
|
|
"""Export feedback as list of dictionaries for CSV conversion."""
|
|
conditions = []
|
|
|
|
if bot_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids))
|
|
if pipeline_ids:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids))
|
|
if start_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time)
|
|
|
|
query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by(
|
|
persistence_monitoring.MonitoringFeedback.timestamp.desc()
|
|
)
|
|
if conditions:
|
|
query = query.where(sqlalchemy.and_(*conditions))
|
|
query = query.limit(limit)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(query)
|
|
rows = result.all()
|
|
|
|
return [
|
|
{
|
|
'id': row[0].id if isinstance(row, tuple) else row.id,
|
|
'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp),
|
|
'feedback_id': row[0].feedback_id if isinstance(row, tuple) else row.feedback_id,
|
|
'feedback_type': 'like'
|
|
if (row[0].feedback_type if isinstance(row, tuple) else row.feedback_type) == 1
|
|
else 'dislike',
|
|
'feedback_content': row[0].feedback_content if isinstance(row, tuple) else row.feedback_content,
|
|
'inaccurate_reasons': row[0].inaccurate_reasons if isinstance(row, tuple) else row.inaccurate_reasons,
|
|
'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id,
|
|
'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name,
|
|
'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id,
|
|
'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name,
|
|
'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id,
|
|
'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id,
|
|
'stream_id': row[0].stream_id if isinstance(row, tuple) else row.stream_id,
|
|
'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id,
|
|
'platform': row[0].platform if isinstance(row, tuple) else row.platform,
|
|
}
|
|
for row in rows
|
|
]
|