This commit is contained in:
Typer_Body
2026-06-02 02:08:07 +08:00
parent 804448b6cd
commit 543fbd8ca0
7 changed files with 199 additions and 196 deletions
+158 -121
View File
@@ -3,11 +3,13 @@ Monitoring helper for recording events during workflow execution.
This module provides convenient methods to record monitoring data
without cluttering the main workflow code.
New logging scheme:
- Trigger log: adapter → workflow_name → local-workflow (with original message)
- LLM call log: adapter → workflow_name → local-workflow (with LLM info)
- LLM response log: adapter → workflow_name → local-workflow (with response message)
- Reply log: adapter → workflow_name → local-workflow (with reply content)
Logging scheme (aligned with pipeline monitoring):
- Trigger log: stores original user message content directly
- LLM call log: uses record_llm_call only (no additional message record)
- LLM response log: stores response message content directly
- Reply log: stores reply content directly
Fields are extracted from WorkflowQuery object when available, with fallback to context_vars.
"""
from __future__ import annotations
@@ -25,65 +27,102 @@ class WorkflowMonitoringHelper:
"""Helper class for workflow monitoring operations"""
@staticmethod
def _get_adapter_name(query: WorkflowQuery) -> str:
"""Get adapter name from query"""
if query.adapter and hasattr(query.adapter, 'name'):
return query.adapter.name
if query.adapter and hasattr(query.adapter, 'adapter_name'):
return query.adapter.adapter_name
return 'WebChat'
def _get_session_id(query, context_vars: dict | None = None) -> str:
"""Build session_id from query or context_vars"""
# Try to get from query first
if not isinstance(query, str) and query.launcher_type:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
launcher_id = query.launcher_id or 'unknown'
return f'{launcher_type}_{launcher_id}'
# Fallback to context_vars
if context_vars and context_vars.get('_launcher_type') and context_vars.get('_launcher_id'):
return f"{context_vars['_launcher_type']}_{context_vars['_launcher_id']}"
return 'workflow_session'
@staticmethod
def _get_session_id(query: WorkflowQuery) -> str:
"""Build session_id from launcher info"""
launcher_type = query.launcher_type.value if query.launcher_type else 'unknown'
launcher_id = query.launcher_id or 'unknown'
return f'{launcher_type}_{launcher_id}'
def _get_platform(query, context_vars: dict | None = None) -> str:
"""Get platform name from query or context_vars"""
if not isinstance(query, str) and query.launcher_type:
if hasattr(query.launcher_type, 'value'):
return query.launcher_type.value
return str(query.launcher_type)
return 'workflow'
@staticmethod
def _get_sender_name(query, context_vars: dict | None = None) -> str | None:
"""Get sender name from query or context_vars"""
# Try query first
if not isinstance(query, str):
if query.sender_name:
return query.sender_name
if query.message_event and hasattr(query.message_event, 'sender'):
sender = query.message_event.sender
if hasattr(sender, 'nickname'):
return sender.nickname
if hasattr(sender, 'member_name'):
return sender.member_name
# Fallback to context_vars
if context_vars:
return context_vars.get('_sender_name')
return None
@staticmethod
async def record_trigger_log(
ap: app.Application,
query: WorkflowQuery,
query,
workflow_id: str,
workflow_name: str,
bot_name: str = 'Workflow',
context_vars: dict | None = None,
) -> str:
"""Record trigger node log
"""Record trigger node log (stores original user message content directly)
Format: adapter → workflow_name → local-workflow
Contains: original message content
Aligned with pipeline monitoring: record_query_start
"""
try:
adapter_name = WorkflowMonitoringHelper._get_adapter_name(query)
session_id = WorkflowMonitoringHelper._get_session_id(query)
session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars)
platform = WorkflowMonitoringHelper._get_platform(query, context_vars)
sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars)
# Get message content
# Get message content - store original content directly
message_content = ''
if query.message_context and hasattr(query.message_context, 'message_content'):
if isinstance(query, str):
message_content = query
elif not isinstance(query, str) and query.message_context:
message_content = query.message_context.message_content
elif query.message_chain and hasattr(query.message_chain, 'model_dump'):
elif not isinstance(query, str) and query.message_chain and hasattr(query.message_chain, 'model_dump'):
message_content = json.dumps(query.message_chain.model_dump(), ensure_ascii=False)
elif not isinstance(query, str) and query.user_message:
message_content = str(query.user_message)
# Build pipeline_name: workflow_name/local-workflow
pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow'
# Build log message: adapter → workflow_name → local-workflow
log_message = f'{adapter_name}{workflow_name} → local-workflow'
if message_content:
log_message += f'\n{message_content}'
# Get bot_id and user_id
bot_id = ''
user_id = None
if not isinstance(query, str):
bot_id = query.bot_uuid or ''
user_id = query.sender_id
elif context_vars:
bot_id = context_vars.get('_bot_id', '') or ''
user_id = context_vars.get('_user_id')
message_id = await ap.monitoring_service.record_message(
bot_id=query.bot_uuid or '',
bot_name=workflow_name or 'Workflow',
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=workflow_id,
pipeline_name=pipeline_name,
message_content=log_message,
pipeline_name=workflow_name or 'Workflow',
message_content=message_content,
session_id=session_id,
status='success',
level='info',
platform='workflow',
user_id=query.sender_id,
user_name=query.sender_name,
platform=platform,
user_id=user_id,
user_name=sender_name,
role='user',
runner_name='local-workflow',
)
return message_id
@@ -94,7 +133,7 @@ class WorkflowMonitoringHelper:
@staticmethod
async def record_llm_call_log(
ap: app.Application,
query: WorkflowQuery,
query,
workflow_id: str,
workflow_name: str,
node_name: str,
@@ -104,37 +143,32 @@ class WorkflowMonitoringHelper:
duration_ms: int,
status: str = 'success',
error_message: str | None = None,
bot_name: str = 'Workflow',
context_vars: dict | None = None,
input_message: str | None = None,
message_id: str | None = None,
):
"""Record LLM call log (with LLM info)
"""Record LLM call log with message_id association
Format: adapter → workflow_name → local-workflow
Contains: LLM call statistics
Aligned with pipeline monitoring: record_llm_call with message_id
LLM calls are aggregated under the trigger log via message_id.
"""
try:
adapter_name = WorkflowMonitoringHelper._get_adapter_name(query)
session_id = WorkflowMonitoringHelper._get_session_id(query)
session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars)
# Build pipeline_name: workflow_name/local-workflow
pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow'
# Build log message with LLM info
log_message = f'{adapter_name}{workflow_name} → local-workflow\n'
log_message += f'LLM Call: {node_name}\n'
log_message += f'Model: {model_name}\n'
log_message += f'Status: {status}\n'
log_message += f'Duration: {duration_ms}ms\n'
log_message += f'Input Tokens: {input_tokens}\n'
log_message += f'Output Tokens: {output_tokens}\n'
log_message += f'Total Tokens: {input_tokens + output_tokens}'
if error_message:
log_message += f'\nError: {error_message}'
# Get bot_id
bot_id = ''
if not isinstance(query, str):
bot_id = query.bot_uuid or ''
elif context_vars:
bot_id = context_vars.get('_bot_id', '') or ''
# Record LLM call with message_id for association
await ap.monitoring_service.record_llm_call(
bot_id=query.bot_uuid or '',
bot_name=workflow_name or 'Workflow',
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=workflow_id,
pipeline_name=pipeline_name,
pipeline_name=workflow_name or 'Workflow',
session_id=session_id,
model_name=model_name,
input_tokens=input_tokens,
@@ -142,22 +176,7 @@ class WorkflowMonitoringHelper:
duration=duration_ms,
status=status,
error_message=error_message,
)
# Also record as message for display
await ap.monitoring_service.record_message(
bot_id=query.bot_uuid or '',
bot_name=workflow_name or 'Workflow',
pipeline_id=workflow_id,
pipeline_name=pipeline_name,
message_content=log_message,
session_id=session_id,
status=status,
level='info',
platform='workflow',
user_id=query.sender_id,
user_name=query.sender_name,
role='system',
message_id=message_id,
)
except Exception as e:
ap.logger.error(f'Failed to record LLM call log: {e}')
@@ -165,42 +184,48 @@ class WorkflowMonitoringHelper:
@staticmethod
async def record_llm_response_log(
ap: app.Application,
query: WorkflowQuery,
query,
workflow_id: str,
workflow_name: str,
node_name: str,
response_content: str,
bot_name: str = 'Workflow',
context_vars: dict | None = None,
):
"""Record LLM response log (without LLM info, with response message)
"""Record LLM response log (stores response content directly)
Format: adapter → workflow_name → local-workflow
Contains: response message content
Aligned with pipeline monitoring: record_query_response
"""
try:
adapter_name = WorkflowMonitoringHelper._get_adapter_name(query)
session_id = WorkflowMonitoringHelper._get_session_id(query)
session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars)
platform = WorkflowMonitoringHelper._get_platform(query, context_vars)
sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars)
# Build pipeline_name: workflow_name/local-workflow
pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow'
# Build log message
log_message = f'{adapter_name}{workflow_name} → local-workflow\n'
log_message += f'Node: {node_name}\n'
log_message += f'Response: {response_content[:500]}' # Limit length
# Get bot_id and user_id
bot_id = ''
user_id = None
if not isinstance(query, str):
bot_id = query.bot_uuid or ''
user_id = query.sender_id
elif context_vars:
bot_id = context_vars.get('_bot_id', '') or ''
user_id = context_vars.get('_user_id')
# Store response content directly, no prefix
await ap.monitoring_service.record_message(
bot_id=query.bot_uuid or '',
bot_name=workflow_name or 'Workflow',
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=workflow_id,
pipeline_name=pipeline_name,
message_content=log_message,
pipeline_name=workflow_name or 'Workflow',
message_content=response_content[:2000], # Limit length
session_id=session_id,
status='success',
level='info',
platform='workflow',
user_id=query.sender_id,
user_name=query.sender_name,
platform=platform,
user_id=user_id,
user_name=sender_name,
role='assistant',
runner_name='local-workflow',
)
except Exception as e:
ap.logger.error(f'Failed to record LLM response log: {e}')
@@ -208,42 +233,48 @@ class WorkflowMonitoringHelper:
@staticmethod
async def record_reply_log(
ap: app.Application,
query: WorkflowQuery,
query,
workflow_id: str,
workflow_name: str,
node_name: str,
reply_content: str,
bot_name: str = 'Workflow',
context_vars: dict | None = None,
):
"""Record reply message log
"""Record reply message log (stores reply content directly)
Format: adapter → workflow_name → local-workflow
Contains: reply message content
Aligned with pipeline monitoring: record_query_response
"""
try:
adapter_name = WorkflowMonitoringHelper._get_adapter_name(query)
session_id = WorkflowMonitoringHelper._get_session_id(query)
session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars)
platform = WorkflowMonitoringHelper._get_platform(query, context_vars)
sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars)
# Build pipeline_name: workflow_name/local-workflow
pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow'
# Build log message
log_message = f'{adapter_name}{workflow_name} → local-workflow\n'
log_message += f'Node: {node_name}\n'
log_message += f'Reply: {reply_content[:500]}' # Limit length
# Get bot_id and user_id
bot_id = ''
user_id = None
if not isinstance(query, str):
bot_id = query.bot_uuid or ''
user_id = query.sender_id
elif context_vars:
bot_id = context_vars.get('_bot_id', '') or ''
user_id = context_vars.get('_user_id')
# Store reply content directly, no prefix
await ap.monitoring_service.record_message(
bot_id=query.bot_uuid or '',
bot_name=workflow_name or 'Workflow',
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=workflow_id,
pipeline_name=pipeline_name,
message_content=log_message,
pipeline_name=workflow_name or 'Workflow',
message_content=reply_content[:2000], # Limit length
session_id=session_id,
status='success',
level='info',
platform='workflow',
user_id=query.sender_id,
user_name=query.sender_name,
platform=platform,
user_id=user_id,
user_name=sender_name,
role='assistant',
runner_name='local-workflow',
)
except Exception as e:
ap.logger.error(f'Failed to record reply log: {e}')
@@ -255,13 +286,14 @@ class LLMCallMonitor:
def __init__(
self,
ap: app.Application,
query: WorkflowQuery,
query,
bot_id: str,
bot_name: str,
workflow_id: str,
workflow_name: str,
node_name: str,
model_name: str,
context_vars: dict | None = None,
):
self.ap = ap
self.query = query
@@ -271,6 +303,7 @@ class LLMCallMonitor:
self.workflow_name = workflow_name
self.node_name = node_name
self.model_name = model_name
self.context_vars = context_vars
self.start_time = None
self.input_tokens = 0
self.output_tokens = 0
@@ -295,6 +328,8 @@ class LLMCallMonitor:
duration_ms=duration_ms,
status='error',
error_message=str(exc_val) if exc_val else None,
bot_name=self.bot_name,
context_vars=self.context_vars,
)
else:
await WorkflowMonitoringHelper.record_llm_call_log(
@@ -308,6 +343,8 @@ class LLMCallMonitor:
output_tokens=self.output_tokens,
duration_ms=duration_ms,
status='success',
bot_name=self.bot_name,
context_vars=self.context_vars,
)
return False