diff --git a/src/langbot/pkg/api/http/service/workflow.py b/src/langbot/pkg/api/http/service/workflow.py index 3d5170af..0d5ed9f0 100644 --- a/src/langbot/pkg/api/http/service/workflow.py +++ b/src/langbot/pkg/api/http/service/workflow.py @@ -395,6 +395,8 @@ class WorkflowService: ), raw_message=message_context_data.get('raw_message', {}), ) + # Set query from message_content for logging purposes + context.query = context.message_context.message_content # Note: Frontend panel logging has been removed. # A new solution will be implemented separately. @@ -404,6 +406,17 @@ class WorkflowService: context.variables['_bot_id'] = bot_id or '' context.variables['_session_id'] = session_id or '' context.variables['_user_id'] = user_id + + # Store launcher info for monitoring (used when query is a string) + if message_context_data: + context.variables['_launcher_type'] = 'websocket' + context.variables['_launcher_id'] = message_context_data.get('sender_id', '') + context.variables['_sender_name'] = message_context_data.get('sender_name', 'User') + + # Ensure query is always set for logging purposes + # For non-message triggers, use trigger_type as fallback + if not context.query: + context.query = f'[{trigger_type}]' max_execution_time = self.DEFAULT_MAX_EXECUTION_TIME workflow_settings = definition.get('settings', {}) if isinstance(definition, dict) else {} diff --git a/src/langbot/pkg/platform/sources/websocket_adapter.py b/src/langbot/pkg/platform/sources/websocket_adapter.py index 0c5d80a3..1145a49c 100644 --- a/src/langbot/pkg/platform/sources/websocket_adapter.py +++ b/src/langbot/pkg/platform/sources/websocket_adapter.py @@ -449,20 +449,19 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter) try: from ...api.http.service.workflow import WorkflowExecutionFailedError + # Log workflow execution start (matching pipeline logging) + session_id = f'{session_type}_{connection.connection_id}' + logger.info(f'Processing request from {session_id} (0): {message_content}') + execution_id = await self.ap.workflow_service.execute_workflow( pipeline_uuid, trigger_type='message', trigger_data=trigger_data, - session_id=f'{session_type}_{connection.connection_id}', + session_id=session_id, user_id=message_context['sender_id'], bot_id=self.ap.platform_mgr.websocket_proxy_bot.bot_entity.uuid, ) - await connection.send_queue.put( - { - 'type': 'broadcast', - 'message': f'Workflow execution started: {execution_id}', - } - ) + # Removed success broadcast - only show error on failure except WorkflowExecutionFailedError as e: await connection.send_queue.put({'type': 'error', 'message': e.message}) except Exception as e: diff --git a/src/langbot/pkg/workflow/monitoring_helper.py b/src/langbot/pkg/workflow/monitoring_helper.py index 034e3098..15ba21b2 100644 --- a/src/langbot/pkg/workflow/monitoring_helper.py +++ b/src/langbot/pkg/workflow/monitoring_helper.py @@ -3,11 +3,13 @@ Monitoring helper for recording events during workflow execution. This module provides convenient methods to record monitoring data without cluttering the main workflow code. -New logging scheme: -- Trigger log: adapter → workflow_name → local-workflow (with original message) -- LLM call log: adapter → workflow_name → local-workflow (with LLM info) -- LLM response log: adapter → workflow_name → local-workflow (with response message) -- Reply log: adapter → workflow_name → local-workflow (with reply content) +Logging scheme (aligned with pipeline monitoring): +- Trigger log: stores original user message content directly +- LLM call log: uses record_llm_call only (no additional message record) +- LLM response log: stores response message content directly +- Reply log: stores reply content directly + +Fields are extracted from WorkflowQuery object when available, with fallback to context_vars. """ from __future__ import annotations @@ -25,65 +27,102 @@ class WorkflowMonitoringHelper: """Helper class for workflow monitoring operations""" @staticmethod - def _get_adapter_name(query: WorkflowQuery) -> str: - """Get adapter name from query""" - if query.adapter and hasattr(query.adapter, 'name'): - return query.adapter.name - if query.adapter and hasattr(query.adapter, 'adapter_name'): - return query.adapter.adapter_name - return 'WebChat' + def _get_session_id(query, context_vars: dict | None = None) -> str: + """Build session_id from query or context_vars""" + # Try to get from query first + if not isinstance(query, str) and query.launcher_type: + launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type) + launcher_id = query.launcher_id or 'unknown' + return f'{launcher_type}_{launcher_id}' + + # Fallback to context_vars + if context_vars and context_vars.get('_launcher_type') and context_vars.get('_launcher_id'): + return f"{context_vars['_launcher_type']}_{context_vars['_launcher_id']}" + + return 'workflow_session' @staticmethod - def _get_session_id(query: WorkflowQuery) -> str: - """Build session_id from launcher info""" - launcher_type = query.launcher_type.value if query.launcher_type else 'unknown' - launcher_id = query.launcher_id or 'unknown' - return f'{launcher_type}_{launcher_id}' + def _get_platform(query, context_vars: dict | None = None) -> str: + """Get platform name from query or context_vars""" + if not isinstance(query, str) and query.launcher_type: + if hasattr(query.launcher_type, 'value'): + return query.launcher_type.value + return str(query.launcher_type) + return 'workflow' + + @staticmethod + def _get_sender_name(query, context_vars: dict | None = None) -> str | None: + """Get sender name from query or context_vars""" + # Try query first + if not isinstance(query, str): + if query.sender_name: + return query.sender_name + if query.message_event and hasattr(query.message_event, 'sender'): + sender = query.message_event.sender + if hasattr(sender, 'nickname'): + return sender.nickname + if hasattr(sender, 'member_name'): + return sender.member_name + + # Fallback to context_vars + if context_vars: + return context_vars.get('_sender_name') + + return None @staticmethod async def record_trigger_log( ap: app.Application, - query: WorkflowQuery, + query, workflow_id: str, workflow_name: str, + bot_name: str = 'Workflow', + context_vars: dict | None = None, ) -> str: - """Record trigger node log + """Record trigger node log (stores original user message content directly) - Format: adapter → workflow_name → local-workflow - Contains: original message content + Aligned with pipeline monitoring: record_query_start """ try: - adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) - session_id = WorkflowMonitoringHelper._get_session_id(query) + session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars) + platform = WorkflowMonitoringHelper._get_platform(query, context_vars) + sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars) - # Get message content + # Get message content - store original content directly message_content = '' - if query.message_context and hasattr(query.message_context, 'message_content'): + if isinstance(query, str): + message_content = query + elif not isinstance(query, str) and query.message_context: message_content = query.message_context.message_content - elif query.message_chain and hasattr(query.message_chain, 'model_dump'): + elif not isinstance(query, str) and query.message_chain and hasattr(query.message_chain, 'model_dump'): message_content = json.dumps(query.message_chain.model_dump(), ensure_ascii=False) + elif not isinstance(query, str) and query.user_message: + message_content = str(query.user_message) - # Build pipeline_name: workflow_name/local-workflow - pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' - - # Build log message: adapter → workflow_name → local-workflow - log_message = f'{adapter_name} → {workflow_name} → local-workflow' - if message_content: - log_message += f'\n{message_content}' + # Get bot_id and user_id + bot_id = '' + user_id = None + if not isinstance(query, str): + bot_id = query.bot_uuid or '' + user_id = query.sender_id + elif context_vars: + bot_id = context_vars.get('_bot_id', '') or '' + user_id = context_vars.get('_user_id') message_id = await ap.monitoring_service.record_message( - bot_id=query.bot_uuid or '', - bot_name=workflow_name or 'Workflow', + bot_id=bot_id, + bot_name=bot_name, pipeline_id=workflow_id, - pipeline_name=pipeline_name, - message_content=log_message, + pipeline_name=workflow_name or 'Workflow', + message_content=message_content, session_id=session_id, status='success', level='info', - platform='workflow', - user_id=query.sender_id, - user_name=query.sender_name, + platform=platform, + user_id=user_id, + user_name=sender_name, role='user', + runner_name='local-workflow', ) return message_id @@ -94,7 +133,7 @@ class WorkflowMonitoringHelper: @staticmethod async def record_llm_call_log( ap: app.Application, - query: WorkflowQuery, + query, workflow_id: str, workflow_name: str, node_name: str, @@ -104,37 +143,32 @@ class WorkflowMonitoringHelper: duration_ms: int, status: str = 'success', error_message: str | None = None, + bot_name: str = 'Workflow', + context_vars: dict | None = None, + input_message: str | None = None, + message_id: str | None = None, ): - """Record LLM call log (with LLM info) + """Record LLM call log with message_id association - Format: adapter → workflow_name → local-workflow - Contains: LLM call statistics + Aligned with pipeline monitoring: record_llm_call with message_id + LLM calls are aggregated under the trigger log via message_id. """ try: - adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) - session_id = WorkflowMonitoringHelper._get_session_id(query) + session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars) - # Build pipeline_name: workflow_name/local-workflow - pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' - - # Build log message with LLM info - log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' - log_message += f'LLM Call: {node_name}\n' - log_message += f'Model: {model_name}\n' - log_message += f'Status: {status}\n' - log_message += f'Duration: {duration_ms}ms\n' - log_message += f'Input Tokens: {input_tokens}\n' - log_message += f'Output Tokens: {output_tokens}\n' - log_message += f'Total Tokens: {input_tokens + output_tokens}' - - if error_message: - log_message += f'\nError: {error_message}' + # Get bot_id + bot_id = '' + if not isinstance(query, str): + bot_id = query.bot_uuid or '' + elif context_vars: + bot_id = context_vars.get('_bot_id', '') or '' + # Record LLM call with message_id for association await ap.monitoring_service.record_llm_call( - bot_id=query.bot_uuid or '', - bot_name=workflow_name or 'Workflow', + bot_id=bot_id, + bot_name=bot_name, pipeline_id=workflow_id, - pipeline_name=pipeline_name, + pipeline_name=workflow_name or 'Workflow', session_id=session_id, model_name=model_name, input_tokens=input_tokens, @@ -142,22 +176,7 @@ class WorkflowMonitoringHelper: duration=duration_ms, status=status, error_message=error_message, - ) - - # Also record as message for display - await ap.monitoring_service.record_message( - bot_id=query.bot_uuid or '', - bot_name=workflow_name or 'Workflow', - pipeline_id=workflow_id, - pipeline_name=pipeline_name, - message_content=log_message, - session_id=session_id, - status=status, - level='info', - platform='workflow', - user_id=query.sender_id, - user_name=query.sender_name, - role='system', + message_id=message_id, ) except Exception as e: ap.logger.error(f'Failed to record LLM call log: {e}') @@ -165,42 +184,48 @@ class WorkflowMonitoringHelper: @staticmethod async def record_llm_response_log( ap: app.Application, - query: WorkflowQuery, + query, workflow_id: str, workflow_name: str, node_name: str, response_content: str, + bot_name: str = 'Workflow', + context_vars: dict | None = None, ): - """Record LLM response log (without LLM info, with response message) + """Record LLM response log (stores response content directly) - Format: adapter → workflow_name → local-workflow - Contains: response message content + Aligned with pipeline monitoring: record_query_response """ try: - adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) - session_id = WorkflowMonitoringHelper._get_session_id(query) + session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars) + platform = WorkflowMonitoringHelper._get_platform(query, context_vars) + sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars) - # Build pipeline_name: workflow_name/local-workflow - pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' - - # Build log message - log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' - log_message += f'Node: {node_name}\n' - log_message += f'Response: {response_content[:500]}' # Limit length + # Get bot_id and user_id + bot_id = '' + user_id = None + if not isinstance(query, str): + bot_id = query.bot_uuid or '' + user_id = query.sender_id + elif context_vars: + bot_id = context_vars.get('_bot_id', '') or '' + user_id = context_vars.get('_user_id') + # Store response content directly, no prefix await ap.monitoring_service.record_message( - bot_id=query.bot_uuid or '', - bot_name=workflow_name or 'Workflow', + bot_id=bot_id, + bot_name=bot_name, pipeline_id=workflow_id, - pipeline_name=pipeline_name, - message_content=log_message, + pipeline_name=workflow_name or 'Workflow', + message_content=response_content[:2000], # Limit length session_id=session_id, status='success', level='info', - platform='workflow', - user_id=query.sender_id, - user_name=query.sender_name, + platform=platform, + user_id=user_id, + user_name=sender_name, role='assistant', + runner_name='local-workflow', ) except Exception as e: ap.logger.error(f'Failed to record LLM response log: {e}') @@ -208,42 +233,48 @@ class WorkflowMonitoringHelper: @staticmethod async def record_reply_log( ap: app.Application, - query: WorkflowQuery, + query, workflow_id: str, workflow_name: str, node_name: str, reply_content: str, + bot_name: str = 'Workflow', + context_vars: dict | None = None, ): - """Record reply message log + """Record reply message log (stores reply content directly) - Format: adapter → workflow_name → local-workflow - Contains: reply message content + Aligned with pipeline monitoring: record_query_response """ try: - adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) - session_id = WorkflowMonitoringHelper._get_session_id(query) + session_id = WorkflowMonitoringHelper._get_session_id(query, context_vars) + platform = WorkflowMonitoringHelper._get_platform(query, context_vars) + sender_name = WorkflowMonitoringHelper._get_sender_name(query, context_vars) - # Build pipeline_name: workflow_name/local-workflow - pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' - - # Build log message - log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' - log_message += f'Node: {node_name}\n' - log_message += f'Reply: {reply_content[:500]}' # Limit length + # Get bot_id and user_id + bot_id = '' + user_id = None + if not isinstance(query, str): + bot_id = query.bot_uuid or '' + user_id = query.sender_id + elif context_vars: + bot_id = context_vars.get('_bot_id', '') or '' + user_id = context_vars.get('_user_id') + # Store reply content directly, no prefix await ap.monitoring_service.record_message( - bot_id=query.bot_uuid or '', - bot_name=workflow_name or 'Workflow', + bot_id=bot_id, + bot_name=bot_name, pipeline_id=workflow_id, - pipeline_name=pipeline_name, - message_content=log_message, + pipeline_name=workflow_name or 'Workflow', + message_content=reply_content[:2000], # Limit length session_id=session_id, status='success', level='info', - platform='workflow', - user_id=query.sender_id, - user_name=query.sender_name, + platform=platform, + user_id=user_id, + user_name=sender_name, role='assistant', + runner_name='local-workflow', ) except Exception as e: ap.logger.error(f'Failed to record reply log: {e}') @@ -255,13 +286,14 @@ class LLMCallMonitor: def __init__( self, ap: app.Application, - query: WorkflowQuery, + query, bot_id: str, bot_name: str, workflow_id: str, workflow_name: str, node_name: str, model_name: str, + context_vars: dict | None = None, ): self.ap = ap self.query = query @@ -271,6 +303,7 @@ class LLMCallMonitor: self.workflow_name = workflow_name self.node_name = node_name self.model_name = model_name + self.context_vars = context_vars self.start_time = None self.input_tokens = 0 self.output_tokens = 0 @@ -295,6 +328,8 @@ class LLMCallMonitor: duration_ms=duration_ms, status='error', error_message=str(exc_val) if exc_val else None, + bot_name=self.bot_name, + context_vars=self.context_vars, ) else: await WorkflowMonitoringHelper.record_llm_call_log( @@ -308,6 +343,8 @@ class LLMCallMonitor: output_tokens=self.output_tokens, duration_ms=duration_ms, status='success', + bot_name=self.bot_name, + context_vars=self.context_vars, ) return False diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py index 571abd2b..4f4a0d4a 100644 --- a/src/langbot/pkg/workflow/nodes/llm_call.py +++ b/src/langbot/pkg/workflow/nodes/llm_call.py @@ -615,11 +615,12 @@ Respond in the same language as the user's input. return s0 logger.info(f'[LLM:{self.node_id}] Response: {_cut_str(response_text)}') - # Record LLM call log and response log + # Record LLM call log only (response log is redundant) try: if self.ap and context.query: workflow_id = context.workflow_id or '' workflow_name = context.variables.get('_workflow_name', 'Workflow') + bot_name = context.variables.get('_bot_name', 'Workflow') node_name = self.get_config('name', self.node_id) model_name = used_model.model_entity.name if used_model else 'unknown' @@ -628,7 +629,10 @@ Respond in the same language as the user's input. if hasattr(self, '_llm_start_time'): duration_ms = int((time.time() - self._llm_start_time) * 1000) - # Record LLM call log (with LLM info) + # Get message_id for LLM call association + message_id = context.variables.get('_monitoring_message_id') + + # Record LLM call log with message_id association await monitoring_helper.WorkflowMonitoringHelper.record_llm_call_log( ap=self.ap, query=context.query, @@ -640,16 +644,9 @@ Respond in the same language as the user's input. output_tokens=usage.get('completion_tokens', 0), duration_ms=duration_ms, status='success', - ) - - # Record LLM response log (with response message) - await monitoring_helper.WorkflowMonitoringHelper.record_llm_response_log( - ap=self.ap, - query=context.query, - workflow_id=workflow_id, - workflow_name=workflow_name, - node_name=node_name, - response_content=response_text, + bot_name=bot_name, + context_vars=context.variables, + message_id=message_id, ) except Exception as e: logger.warning(f'[LLM:{self.node_id}] Failed to record LLM logs: {e}') diff --git a/src/langbot/pkg/workflow/nodes/message_trigger.py b/src/langbot/pkg/workflow/nodes/message_trigger.py index 7d2ce6ff..710a834a 100644 --- a/src/langbot/pkg/workflow/nodes/message_trigger.py +++ b/src/langbot/pkg/workflow/nodes/message_trigger.py @@ -26,17 +26,23 @@ class MessageTriggerNode(WorkflowNode): async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: msg_ctx = context.message_context - # Record trigger log + # Record trigger log and store message_id for LLM call association try: if self.ap and context.query: workflow_id = context.workflow_id or '' workflow_name = context.variables.get('_workflow_name', 'Workflow') - await monitoring_helper.WorkflowMonitoringHelper.record_trigger_log( + bot_name = context.variables.get('_bot_name', 'Workflow') + message_id = await monitoring_helper.WorkflowMonitoringHelper.record_trigger_log( ap=self.ap, query=context.query, workflow_id=workflow_id, workflow_name=workflow_name, + bot_name=bot_name, + context_vars=context.variables, ) + # Store message_id for LLM call monitoring association + if message_id: + context.variables['_monitoring_message_id'] = message_id except Exception as e: logger.warning(f'[MessageTrigger:{self.node_id}] Failed to record trigger log: {e}') diff --git a/src/langbot/pkg/workflow/nodes/reply_message.py b/src/langbot/pkg/workflow/nodes/reply_message.py index af464bb3..e3f095dc 100644 --- a/src/langbot/pkg/workflow/nodes/reply_message.py +++ b/src/langbot/pkg/workflow/nodes/reply_message.py @@ -112,6 +112,7 @@ class ReplyMessageNode(WorkflowNode): if self.ap and context.query and send_success: workflow_id = context.workflow_id or '' workflow_name = context.variables.get('_workflow_name', 'Workflow') + bot_name = context.variables.get('_bot_name', 'Workflow') node_name = self.get_config('name', self.node_id) await monitoring_helper.WorkflowMonitoringHelper.record_reply_log( ap=self.ap, @@ -120,6 +121,8 @@ class ReplyMessageNode(WorkflowNode): workflow_name=workflow_name, node_name=node_name, reply_content=message_str, + bot_name=bot_name, + context_vars=context.variables, ) except Exception as e: logger.warning(f'[ReplyMessage:{self.node_id}] Failed to record reply log: {e}') diff --git a/web/src/app/home/workflows/components/workflow-executions/WorkflowExecutionsTab.tsx b/web/src/app/home/workflows/components/workflow-executions/WorkflowExecutionsTab.tsx index fb3e0f76..fe6e539f 100644 --- a/web/src/app/home/workflows/components/workflow-executions/WorkflowExecutionsTab.tsx +++ b/web/src/app/home/workflows/components/workflow-executions/WorkflowExecutionsTab.tsx @@ -247,7 +247,7 @@ export default function WorkflowExecutionsTab({ await backendClient.executeWorkflow(workflowId, { trigger_type: 'manual', }); - toast.success(t('workflows.manualTrigger') + ' ✓'); + // Removed success toast - only show error toast on failure loadExecutions(); loadStats(); } catch (err: unknown) { @@ -560,17 +560,13 @@ export default function WorkflowExecutionsTab({ onValueChange={setSelectedTab} className="flex-1 flex flex-col overflow-hidden" > - + {t('workflows.details')} {t('workflows.nodeExecutions')} - - - {t('workflows.logs')} - - - {logsLoading ? ( -
- -
- ) : executionLogs.length > 0 ? ( - -
- {executionLogs.map((log) => ( -
-
- - {log.timestamp - ? new Date(log.timestamp).toLocaleTimeString() - : '-'} - - - [{log.level}] - - {log.node_id && ( - - [{log.node_id}] - - )} -
-
- {log.message} -
- {log.data && Object.keys(log.data).length > 0 && ( -
-                              {JSON.stringify(log.data, null, 2)}
-                            
- )} -
- ))} -
-
- ) : ( -
- {t('workflows.noLogs')} -
- )} -
)}