Files
LangBot/src/langbot/pkg/pipeline/monitoring_helper.py
Junyan Chin d9a630b8c1 feat: add session message monitoring tab to bot detail dialog (#2005)
* feat: add session message monitoring tab to bot detail dialog

Add a new "Sessions" tab in the bot detail dialog that displays
sent & received messages grouped by sessions. Users can select
any session to view its messages in a chat-bubble style layout.

Backend changes:
- Add sessionId filter to monitoring messages endpoint
- Add role column to MonitoringMessage (user/assistant)
- Record bot responses in monitoring via record_query_response()
- Add DB migration (dbm019) for the new role column

Frontend changes:
- New BotSessionMonitor component with session list + message viewer
- Add Sessions sidebar tab to BotDetailDialog
- Add getBotSessions/getSessionMessages API methods to BackendClient
- Add i18n translations (en-US, zh-Hans, zh-Hant, ja-JP)

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>

* refactor: remove outdated version comment from PipelineManager class

* fix: bump required_database_version to 19 to trigger monitoring_messages.role migration

* fix: prevent session message auto-scroll from pushing dialog content out of view

Replace scrollIntoView (which scrolls all ancestor containers) with
direct scrollTop manipulation on the ScrollArea viewport. This keeps
the scroll contained within the messages panel only.

* ui: redesign BotSessionMonitor with polished chat UI

- Wider session list (w-72) with avatar circles and cleaner layout
- Richer chat header with avatar, platform info, and active indicator
- User messages now use blue-500 (solid) instead of blue-100 for
  clear visual distinction
- Metadata (time, runner) shown on hover below bubbles, not inside
- Proper empty state illustrations for both panels
- Better spacing, rounded corners, and shadow treatment
- Consistent dark mode styling

* fix: infinite re-render loop in DynamicFormComponent

The useEffect depended on onSubmit which was a new closure every
parent render. Calling onSubmit inside the effect triggered parent
state update → re-render → new onSubmit ref → effect re-runs → loop.

Fix: use useRef to hold a stable reference to onSubmit, removing it
from the useEffect dependency array.

Also add DialogDescription to BotDetailDialog to suppress Radix
aria-describedby warning.

* fix: remove .html suffix from docs.langbot.app links (Mintlify migration)

* style: fix prettier and ruff formatting

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Happy <yesreply@happy.engineering>
2026-02-25 21:56:24 +08:00

325 lines
12 KiB
Python

"""
Monitoring helper for recording events during pipeline execution.
This module provides convenient methods to record monitoring data
without cluttering the main pipeline code.
"""
from __future__ import annotations
import traceback
import typing
import time
import json
if typing.TYPE_CHECKING:
from ..core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
class MonitoringHelper:
"""Helper class for monitoring operations"""
@staticmethod
async def record_query_start(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
runner_name: str | None = None,
) -> str:
"""Record the start of query processing, returns message_id"""
try:
# Check if session exists, if not, record session start
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Try to record message
# Use JSON serialization to preserve message chain structure (including image URLs, etc.)
if hasattr(query, 'message_chain') and hasattr(query.message_chain, 'model_dump'):
message_content = json.dumps(query.message_chain.model_dump(), ensure_ascii=False)
else:
message_content = str(query)
# Variables will be updated in record_query_success after preproc stage sets them
# Here we just record None, the full variables will be set when query completes
message_id = await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=message_content,
session_id=session_id,
status='pending',
level='info',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
variables=None, # Will be updated in record_query_success
)
# Update session activity or create new session if it doesn't exist
# Always pass pipeline info to handle pipeline switches
session_updated = await ap.monitoring_service.update_session_activity(
session_id,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
)
if not session_updated:
# Session doesn't exist, create it
await ap.monitoring_service.record_session_start(
session_id=session_id,
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
)
return message_id
except Exception as e:
ap.logger.error(f'Failed to record query start: {e}')
return ''
@staticmethod
async def record_query_success(
ap: app.Application,
message_id: str,
query: pipeline_query.Query | None = None,
):
"""Record successful query processing by updating message status and variables"""
try:
if message_id:
# Serialize query.variables (filtering out internal variables)
query_variables_str = None
if query and hasattr(query, 'variables') and query.variables:
filtered_vars = {k: v for k, v in query.variables.items() if not k.startswith('_')}
if filtered_vars:
try:
query_variables_str = json.dumps(filtered_vars, ensure_ascii=False, default=str)
except Exception:
pass
await ap.monitoring_service.update_message_status(
message_id=message_id,
status='success',
variables=query_variables_str,
)
except Exception as e:
ap.logger.error(f'Failed to record query success: {e}')
@staticmethod
async def record_query_response(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
runner_name: str | None = None,
):
"""Record bot response message to monitoring"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Extract response content from resp_message_chain
if hasattr(query, 'resp_message_chain') and query.resp_message_chain:
# Serialize the last response message chain
last_resp = query.resp_message_chain[-1]
if hasattr(last_resp, 'model_dump'):
message_content = json.dumps(last_resp.model_dump(), ensure_ascii=False)
else:
message_content = str(last_resp)
elif hasattr(query, 'resp_messages') and query.resp_messages:
last_resp = query.resp_messages[-1]
if hasattr(last_resp, 'get_content_platform_message_chain'):
chain = last_resp.get_content_platform_message_chain()
if hasattr(chain, 'model_dump'):
message_content = json.dumps(chain.model_dump(), ensure_ascii=False)
else:
message_content = str(chain)
else:
message_content = str(last_resp)
else:
return # No response to record
await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=message_content,
session_id=session_id,
status='success',
level='info',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
role='assistant',
)
except Exception as e:
ap.logger.error(f'Failed to record query response: {e}')
@staticmethod
async def record_query_error(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
error: Exception,
runner_name: str | None = None,
) -> str:
"""Record query processing error, returns message_id"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Record error message
message_id = await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=f'Error: {str(error)}',
session_id=session_id,
status='error',
level='error',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
)
# Record error log
await ap.monitoring_service.record_error(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
error_type=type(error).__name__,
error_message=str(error),
session_id=session_id,
stack_trace=traceback.format_exc(),
message_id=message_id,
)
return message_id
except Exception as e:
ap.logger.error(f'Failed to record query error: {e}')
return ''
@staticmethod
async def record_llm_call(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
model_name: str,
input_tokens: int,
output_tokens: int,
duration_ms: int,
status: str = 'success',
cost: float | None = None,
error_message: str | None = None,
message_id: str | None = None,
):
"""Record LLM call"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
await ap.monitoring_service.record_llm_call(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
session_id=session_id,
model_name=model_name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration=duration_ms,
status=status,
cost=cost,
error_message=error_message,
message_id=message_id,
)
except Exception as e:
ap.logger.error(f'Failed to record LLM call: {e}')
class LLMCallMonitor:
"""Context manager for monitoring LLM calls"""
def __init__(
self,
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
model_name: str,
):
self.ap = ap
self.query = query
self.bot_id = bot_id
self.bot_name = bot_name
self.pipeline_id = pipeline_id
self.pipeline_name = pipeline_name
self.model_name = model_name
self.start_time = None
self.input_tokens = 0
self.output_tokens = 0
async def __aenter__(self):
self.start_time = time.time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
duration_ms = int((time.time() - self.start_time) * 1000)
if exc_type is not None:
# Error occurred
await MonitoringHelper.record_llm_call(
ap=self.ap,
query=self.query,
bot_id=self.bot_id,
bot_name=self.bot_name,
pipeline_id=self.pipeline_id,
pipeline_name=self.pipeline_name,
model_name=self.model_name,
input_tokens=self.input_tokens,
output_tokens=self.output_tokens,
duration_ms=duration_ms,
status='error',
error_message=str(exc_val) if exc_val else None,
)
else:
# Success
await MonitoringHelper.record_llm_call(
ap=self.ap,
query=self.query,
bot_id=self.bot_id,
bot_name=self.bot_name,
pipeline_id=self.pipeline_id,
pipeline_name=self.pipeline_name,
model_name=self.model_name,
input_tokens=self.input_tokens,
output_tokens=self.output_tokens,
duration_ms=duration_ms,
status='success',
)
return False # Don't suppress exceptions