diff --git a/pyproject.toml b/pyproject.toml index 8c5fe651..e59e9e0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ dependencies = [ "chromadb>=1.0.0,<2.0.0", "qdrant-client (>=1.15.1,<2.0.0)", "pyseekdb==1.1.0.post3", - "langbot-plugin==0.3.11", + "langbot-plugin @ file:///home/typer/Desktop/langbot-plugin-sdk", "asyncpg>=0.30.0", "line-bot-sdk>=3.19.0", "matrix-nio>=0.25.2", diff --git a/src/langbot/pkg/api/http/service/workflow.py b/src/langbot/pkg/api/http/service/workflow.py index 77297fa6..3d5170af 100644 --- a/src/langbot/pkg/api/http/service/workflow.py +++ b/src/langbot/pkg/api/http/service/workflow.py @@ -3,10 +3,13 @@ from __future__ import annotations import asyncio +import logging import uuid from datetime import datetime, timedelta from typing import Optional +logger = logging.getLogger(__name__) + import sqlalchemy from ....core import app @@ -14,13 +17,12 @@ from ....entity.persistence import workflow as persistence_workflow from ....workflow.entities import ( WorkflowDefinition, ExecutionContext, - ExecutionStatus, NodeDefinition, EdgeDefinition, Position, MessageContext, - NodeStatus, ) +from langbot_plugin.api.entities.builtin.workflow.enums import ExecutionStatus, NodeStatus from ....workflow.executor import WorkflowExecutor from ....workflow.registry import NodeTypeRegistry @@ -54,8 +56,9 @@ class WorkflowService: self.executor = WorkflowExecutor(ap) self.registry = NodeTypeRegistry.instance() - # Import workflow nodes to trigger registration - from ....workflow import nodes # noqa: F401 + # Auto-discover and register workflow nodes using discovery engine + if hasattr(ap, 'discover') and ap.discover is not None: + self.registry.discover_nodes(ap.discover) async def get_workflows( self, sort_by: str = 'created_at', sort_order: str = 'DESC', enabled_only: bool = False @@ -328,6 +331,17 @@ class WorkflowService: raw_trigger_data = trigger_data or {} + # Get bot name and workflow name for monitoring + bot_name = 'WebChat' + workflow_name = workflow_dict.get('name', 'Unknown') + if bot_id: + try: + bot = await self.ap.bot_service.get_bot(bot_id, include_secret=False) + if bot: + bot_name = bot.get('name', 'WebChat') + except Exception: + pass + # Create execution context context = ExecutionContext( execution_id=execution_uuid, @@ -382,6 +396,15 @@ class WorkflowService: raw_message=message_context_data.get('raw_message', {}), ) + # Note: Frontend panel logging has been removed. + # A new solution will be implemented separately. + # Store workflow info in context for child nodes to reference + context.variables['_workflow_name'] = workflow_name + context.variables['_bot_name'] = bot_name + context.variables['_bot_id'] = bot_id or '' + context.variables['_session_id'] = session_id or '' + context.variables['_user_id'] = user_id + max_execution_time = self.DEFAULT_MAX_EXECUTION_TIME workflow_settings = definition.get('settings', {}) if isinstance(definition, dict) else {} if isinstance(workflow_settings, dict): @@ -459,6 +482,8 @@ class WorkflowService: error=str(e), ) ) + + raise WorkflowExecutionFailedError( execution_uuid, str(e), @@ -528,8 +553,6 @@ class WorkflowService: async def get_node_types(self) -> list[dict]: """Get all available node types""" - # Process pending registrations - self.registry.process_pending_registrations() node_types = self.registry.list_all() # Enrich node schemas with pipeline config metadata @@ -537,7 +560,6 @@ class WorkflowService: async def get_node_types_by_category(self) -> dict[str, list[dict]]: """Get node types organized by category""" - self.registry.process_pending_registrations() categories = self.registry.get_categories() # Enrich node schemas with pipeline config metadata @@ -548,7 +570,6 @@ class WorkflowService: async def get_node_types_by_category_meta(self) -> list[dict]: """Get workflow node category metadata for the editor UI.""" - self.registry.process_pending_registrations() categories = self.registry.get_categories() ordered_categories = ['trigger', 'process', 'control', 'action', 'integration', 'misc'] @@ -1094,43 +1115,42 @@ class WorkflowService: async def get_execution_logs( self, workflow_uuid: str, execution_uuid: str, limit: int = 100, offset: int = 0 ) -> dict: - """Get execution logs for a workflow execution""" + """Get execution logs for a workflow execution from monitoring_messages table""" execution = await self.get_execution(execution_uuid) if execution is None: raise ValueError(f'Execution {execution_uuid} not found') if execution.get('workflow_uuid') != workflow_uuid: raise ValueError(f'Execution {execution_uuid} not found in workflow {workflow_uuid}') + # Get logs from monitoring_messages table + from ....entity.persistence import monitoring as persistence_monitoring + query = ( - sqlalchemy.select(persistence_workflow.WorkflowNodeExecution) - .where(persistence_workflow.WorkflowNodeExecution.execution_uuid == execution_uuid) - .order_by(persistence_workflow.WorkflowNodeExecution.id.asc()) + sqlalchemy.select(persistence_monitoring.MonitoringMessage) + .where(persistence_monitoring.MonitoringMessage.pipeline_id == workflow_uuid) + .order_by(persistence_monitoring.MonitoringMessage.timestamp.asc()) .limit(limit) .offset(offset) ) result = await self.ap.persistence_mgr.execute_async(query) - node_executions = result.all() + messages = result.all() logs = [] - for node_exec in node_executions: - serialized = self._serialize_node_execution(node_exec) - timestamp = serialized.get('completed_at') or serialized.get('started_at') or execution.get('started_at') - level = 'error' if serialized.get('status') == 'failed' else 'info' - message = f'{serialized.get("node_type")}::{serialized.get("node_id")} - {serialized.get("status")}' - if serialized.get('error'): - message = f'{message} - {serialized.get("error")}' + for msg in messages: + serialized = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringMessage, msg) logs.append( { - 'id': str(serialized.get('id', serialized.get('node_id'))), - 'timestamp': timestamp, - 'level': level, - 'node_id': serialized.get('node_id'), - 'message': message, + 'id': serialized.get('id', ''), + 'timestamp': serialized.get('timestamp'), + 'level': serialized.get('level', 'info'), + 'node_id': serialized.get('runner_name', ''), + 'message': serialized.get('message_content', ''), 'data': { - 'inputs': serialized.get('inputs'), - 'outputs': serialized.get('outputs'), - 'retry_count': serialized.get('retry_count'), + 'role': serialized.get('role'), + 'platform': serialized.get('platform'), + 'user_id': serialized.get('user_id'), + 'user_name': serialized.get('user_name'), }, } ) diff --git a/src/langbot/pkg/core/stages/load_config.py b/src/langbot/pkg/core/stages/load_config.py index 2250f294..8081765e 100644 --- a/src/langbot/pkg/core/stages/load_config.py +++ b/src/langbot/pkg/core/stages/load_config.py @@ -237,11 +237,9 @@ class LoadConfigStage(stage.BootingStage): for node_config in ap.workflow_node_configs.values(): workflow_registry.register_metadata(node_config, source=node_config.get('_source', 'core')) - # Import node modules after metadata registration so decorators can bind - # implementations to YAML-defined canonical node types. - from langbot.pkg.workflow import nodes as workflow_nodes # noqa: F401 - - workflow_registry.process_pending_registrations() + # Auto-discover and register workflow nodes using discovery engine + if hasattr(ap, 'discover') and ap.discover is not None: + workflow_registry.discover_nodes(ap.discover) workflow_load_errors = workflow_metadata_loader.get_load_errors() if workflow_load_errors: diff --git a/src/langbot/pkg/discover/engine.py b/src/langbot/pkg/discover/engine.py index 713420d1..75219672 100644 --- a/src/langbot/pkg/discover/engine.py +++ b/src/langbot/pkg/discover/engine.py @@ -304,3 +304,65 @@ class ComponentDiscoveryEngine: if component.kind == kind: result.append(component) return result + + def discover_workflow_nodes(self, nodes_dir: str) -> typing.List[typing.Type]: + """Discover workflow node classes from a directory of Python modules. + + Scans all .py files in the given directory, imports them, and collects + classes that are subclasses of WorkflowNode. + + Args: + nodes_dir: Directory path like 'pkg/workflow/nodes/' + + Returns: + List of WorkflowNode subclasses found + """ + from langbot.pkg.workflow.node import WorkflowNode + + node_classes: typing.List[typing.Type[WorkflowNode]] = [] + + # Normalize path + if nodes_dir.endswith('/'): + nodes_dir = nodes_dir[:-1] + + # Import the nodes package to trigger all module imports + module_path = nodes_dir.replace('/', '.').replace('\\', '.') + package_path = module_path + + try: + # Import the package __init__ to trigger submodule imports + importlib.import_module(f'langbot.{package_path}') + except ImportError: + self.ap.logger.warning(f'Failed to import workflow nodes package: langbot.{package_path}') + + # Since workflow/__init__.py is empty, explicitly import all .py files in the nodes directory + import os + # engine.py is in langbot/pkg/discover/, nodes are in langbot/pkg/workflow/nodes/ + nodes_abs_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'workflow', 'nodes')) + if os.path.isdir(nodes_abs_path): + for filename in os.listdir(nodes_abs_path): + if filename.endswith('.py') and not filename.startswith('_'): + module_name = filename[:-3] + try: + importlib.import_module(f'langbot.{package_path}.{module_name}') + except ImportError as e: + self.ap.logger.warning(f'Failed to import workflow node module: {module_name}: {e}') + + # Now collect all WorkflowNode subclasses from sys.modules + import sys + prefix = f'langbot.{package_path}.' + for mod_name, mod in sys.modules.items(): + if mod_name.startswith(prefix) and mod is not None: + for attr_name in dir(mod): + attr = getattr(mod, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, WorkflowNode) + and attr is not WorkflowNode + and hasattr(attr, 'type_name') + and attr.type_name + ): + if attr not in node_classes: + node_classes.append(attr) + + return node_classes diff --git a/src/langbot/pkg/pipeline/config_coercion.py b/src/langbot/pkg/pipeline/config.py similarity index 100% rename from src/langbot/pkg/pipeline/config_coercion.py rename to src/langbot/pkg/pipeline/config.py diff --git a/src/langbot/pkg/pipeline/monitoring_helper.py b/src/langbot/pkg/pipeline/monitor.py similarity index 100% rename from src/langbot/pkg/pipeline/monitoring_helper.py rename to src/langbot/pkg/pipeline/monitor.py diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 394f35e2..6e548712 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -13,7 +13,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.events as events from ..utils import importutil -from .config_coercion import coerce_pipeline_config +from .config import coerce_pipeline_config import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -284,9 +284,9 @@ class RuntimePipeline: # Record query start and store message_id message_id = '' try: - from . import monitoring_helper + from . import monitor - message_id = await monitoring_helper.MonitoringHelper.record_query_start( + message_id = await monitor.MonitoringHelper.record_query_start( ap=self.ap, query=query, bot_id=query.bot_uuid or 'unknown', @@ -338,7 +338,7 @@ class RuntimePipeline: # Record query success only if no error occurred during processing if not query.variables.get('_monitoring_has_error', False): try: - await monitoring_helper.MonitoringHelper.record_query_success( + await monitor.MonitoringHelper.record_query_success( ap=self.ap, message_id=message_id, query=query, @@ -348,7 +348,7 @@ class RuntimePipeline: # Record bot response message try: - await monitoring_helper.MonitoringHelper.record_query_response( + await monitor.MonitoringHelper.record_query_response( ap=self.ap, query=query, bot_id=query.bot_uuid or 'unknown', @@ -367,9 +367,9 @@ class RuntimePipeline: # Record query error try: - from . import monitoring_helper + from . import monitor - await monitoring_helper.MonitoringHelper.record_query_error( + await monitor.MonitoringHelper.record_query_error( ap=self.ap, query=query, bot_id=query.bot_uuid or 'unknown', diff --git a/src/langbot/pkg/provider/modelmgr/requester.py b/src/langbot/pkg/provider/modelmgr/requester.py index cb9a4183..7b29fe52 100644 --- a/src/langbot/pkg/provider/modelmgr/requester.py +++ b/src/langbot/pkg/provider/modelmgr/requester.py @@ -84,7 +84,7 @@ class RuntimeProvider: # Import monitoring helper try: - from ...pipeline import monitoring_helper + from ...pipeline import monitor # Get monitoring metadata from query variables if query.variables: @@ -96,7 +96,7 @@ class RuntimeProvider: pipeline_name = 'Unknown' message_id = None - await monitoring_helper.MonitoringHelper.record_llm_call( + await monitor.MonitoringHelper.record_llm_call( ap=self.requester.ap, query=query, bot_id=query.bot_uuid or 'unknown', @@ -154,7 +154,7 @@ class RuntimeProvider: # Import monitoring helper try: - from ...pipeline import monitoring_helper + from ...pipeline import monitor # Get monitoring metadata from query variables if query.variables: @@ -166,7 +166,7 @@ class RuntimeProvider: pipeline_name = 'Unknown' message_id = None - await monitoring_helper.MonitoringHelper.record_llm_call( + await monitor.MonitoringHelper.record_llm_call( ap=self.requester.ap, query=query, bot_id=query.bot_uuid or 'unknown', diff --git a/src/langbot/pkg/workflow/__init__.py b/src/langbot/pkg/workflow/__init__.py index 02e644d4..e69de29b 100644 --- a/src/langbot/pkg/workflow/__init__.py +++ b/src/langbot/pkg/workflow/__init__.py @@ -1,81 +0,0 @@ -"""Workflow package for LangBot - -This package provides a visual workflow system for LangBot, including: -- Workflow definition models -- Execution engine -- Node types (trigger, process, control, action, integration) -- Trigger system for automation -""" - -from .entities import ( - WorkflowDefinition, - NodeDefinition, - EdgeDefinition, - Position, - PortDefinition, - TriggerDefinition, - WorkflowSettings, - ExecutionContext, - NodeState, - ExecutionStatus, - NodeStatus, -) - -from importlib import import_module -from typing import Any - -from .node import WorkflowNode, workflow_node - - -def __getattr__(name: str) -> Any: - """Lazily expose heavier workflow modules. - - Loading workflow metadata should not import the executor or node modules as a - side effect. Node implementations are imported explicitly during boot after - YAML metadata has been registered. - """ - if name == 'NodeTypeRegistry': - from .registry import NodeTypeRegistry - - return NodeTypeRegistry - - if name == 'WorkflowExecutor': - from .executor import WorkflowExecutor - - return WorkflowExecutor - - if name in ('DebugWorkflowExecutor', 'DebugExecutionState', 'ExecutionLog'): - from . import debug - - return getattr(debug, name) - - if name == 'nodes': - return import_module('.nodes', __name__) - - raise AttributeError(f'module {__name__!r} has no attribute {name!r}') - -__all__ = [ - # Entities - 'WorkflowDefinition', - 'NodeDefinition', - 'EdgeDefinition', - 'Position', - 'PortDefinition', - 'TriggerDefinition', - 'WorkflowSettings', - 'ExecutionContext', - 'NodeState', - 'ExecutionStatus', - 'NodeStatus', - # Node - 'WorkflowNode', - 'workflow_node', - # Registry - 'NodeTypeRegistry', - # Executor - 'WorkflowExecutor', - # Debug - 'DebugWorkflowExecutor', - 'DebugExecutionState', - 'ExecutionLog', -] diff --git a/src/langbot/pkg/workflow/entities.py b/src/langbot/pkg/workflow/entities.py index 93d6a8f4..0b050f57 100644 --- a/src/langbot/pkg/workflow/entities.py +++ b/src/langbot/pkg/workflow/entities.py @@ -11,16 +11,18 @@ from typing import Any, Optional import pydantic # Import SDK entities for standard workflow protocol types -from langbot_plugin.api.entities.builtin.workflow import ( +from langbot_plugin.api.entities.builtin.workflow.entities import ( ExecutionContext, ExecutionStep, - ExecutionStatus, MessageContext, NodeDefinition, NodeState, - NodeStatus, PortDefinition, ) +from langbot_plugin.api.entities.builtin.workflow.enums import ( + ExecutionStatus, + NodeStatus, +) class Position(pydantic.BaseModel): diff --git a/src/langbot/pkg/workflow/executor.py b/src/langbot/pkg/workflow/executor.py index a5dee55f..3bab8bdd 100644 --- a/src/langbot/pkg/workflow/executor.py +++ b/src/langbot/pkg/workflow/executor.py @@ -32,6 +32,7 @@ from .entities import ( ) from ..entity.persistence import workflow as persistence_workflow from .registry import NodeTypeRegistry +from . import monitor if TYPE_CHECKING: from ..core import app @@ -169,6 +170,10 @@ class WorkflowExecutor: context.status = ExecutionStatus.RUNNING context.start_time = datetime.now() + # Note: Frontend panel logging has been removed. + # A new solution will be implemented separately. + monitoring_message_id = '' + try: # Build execution graph node_map = {node.id: node for node in workflow.nodes} @@ -227,9 +232,15 @@ class WorkflowExecutor: }, ) + # Note: Frontend panel logging has been removed. + # A new solution will be implemented separately. + finally: context.end_time = datetime.now() + # Note: Frontend panel logging has been removed. + # A new solution will be implemented separately. + return context async def _execute_from_node( diff --git a/src/langbot/pkg/workflow/monitor.py b/src/langbot/pkg/workflow/monitor.py new file mode 100644 index 00000000..d4be8df3 --- /dev/null +++ b/src/langbot/pkg/workflow/monitor.py @@ -0,0 +1,61 @@ +""" +Monitoring helper for recording events during workflow execution. +This module provides convenient methods to record monitoring data +without cluttering the main workflow code. + +NOTE: All frontend panel logging functionality has been removed. +A new solution will be implemented separately. +""" + +from __future__ import annotations + +import typing +import time + +if typing.TYPE_CHECKING: + from ..core import app + from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery + + +class WorkflowMonitoringHelper: + """Helper class for workflow monitoring operations""" + + # All frontend panel logging methods have been removed. + # A new solution will be implemented separately. + pass + + +class LLMCallMonitor: + """Context manager for monitoring LLM calls in workflow""" + + def __init__( + self, + ap: app.Application, + query: WorkflowQuery, + bot_id: str, + bot_name: str, + workflow_id: str, + workflow_name: str, + node_name: str, + model_name: str, + ): + self.ap = ap + self.query = query + self.bot_id = bot_id + self.bot_name = bot_name + self.workflow_id = workflow_id + self.workflow_name = workflow_name + self.node_name = node_name + self.model_name = model_name + self.start_time = None + self.input_tokens = 0 + self.output_tokens = 0 + + async def __aenter__(self): + self.start_time = time.time() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # LLM call monitoring has been removed. + # A new solution will be implemented separately. + return False diff --git a/src/langbot/pkg/workflow/monitoring_helper.py b/src/langbot/pkg/workflow/monitoring_helper.py new file mode 100644 index 00000000..034e3098 --- /dev/null +++ b/src/langbot/pkg/workflow/monitoring_helper.py @@ -0,0 +1,313 @@ +""" +Monitoring helper for recording events during workflow execution. +This module provides convenient methods to record monitoring data +without cluttering the main workflow code. + +New logging scheme: +- Trigger log: adapter → workflow_name → local-workflow (with original message) +- LLM call log: adapter → workflow_name → local-workflow (with LLM info) +- LLM response log: adapter → workflow_name → local-workflow (with response message) +- Reply log: adapter → workflow_name → local-workflow (with reply content) +""" + +from __future__ import annotations + +import typing +import time +import json + +if typing.TYPE_CHECKING: + from ..core import app + from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery + + +class WorkflowMonitoringHelper: + """Helper class for workflow monitoring operations""" + + @staticmethod + def _get_adapter_name(query: WorkflowQuery) -> str: + """Get adapter name from query""" + if query.adapter and hasattr(query.adapter, 'name'): + return query.adapter.name + if query.adapter and hasattr(query.adapter, 'adapter_name'): + return query.adapter.adapter_name + return 'WebChat' + + @staticmethod + def _get_session_id(query: WorkflowQuery) -> str: + """Build session_id from launcher info""" + launcher_type = query.launcher_type.value if query.launcher_type else 'unknown' + launcher_id = query.launcher_id or 'unknown' + return f'{launcher_type}_{launcher_id}' + + @staticmethod + async def record_trigger_log( + ap: app.Application, + query: WorkflowQuery, + workflow_id: str, + workflow_name: str, + ) -> str: + """Record trigger node log + + Format: adapter → workflow_name → local-workflow + Contains: original message content + """ + try: + adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) + session_id = WorkflowMonitoringHelper._get_session_id(query) + + # Get message content + message_content = '' + if query.message_context and hasattr(query.message_context, 'message_content'): + message_content = query.message_context.message_content + elif query.message_chain and hasattr(query.message_chain, 'model_dump'): + message_content = json.dumps(query.message_chain.model_dump(), ensure_ascii=False) + + # Build pipeline_name: workflow_name/local-workflow + pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' + + # Build log message: adapter → workflow_name → local-workflow + log_message = f'{adapter_name} → {workflow_name} → local-workflow' + if message_content: + log_message += f'\n{message_content}' + + message_id = await ap.monitoring_service.record_message( + bot_id=query.bot_uuid or '', + bot_name=workflow_name or 'Workflow', + pipeline_id=workflow_id, + pipeline_name=pipeline_name, + message_content=log_message, + session_id=session_id, + status='success', + level='info', + platform='workflow', + user_id=query.sender_id, + user_name=query.sender_name, + role='user', + ) + + return message_id + except Exception as e: + ap.logger.error(f'Failed to record trigger log: {e}') + return '' + + @staticmethod + async def record_llm_call_log( + ap: app.Application, + query: WorkflowQuery, + workflow_id: str, + workflow_name: str, + node_name: str, + model_name: str, + input_tokens: int, + output_tokens: int, + duration_ms: int, + status: str = 'success', + error_message: str | None = None, + ): + """Record LLM call log (with LLM info) + + Format: adapter → workflow_name → local-workflow + Contains: LLM call statistics + """ + try: + adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) + session_id = WorkflowMonitoringHelper._get_session_id(query) + + # Build pipeline_name: workflow_name/local-workflow + pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' + + # Build log message with LLM info + log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' + log_message += f'LLM Call: {node_name}\n' + log_message += f'Model: {model_name}\n' + log_message += f'Status: {status}\n' + log_message += f'Duration: {duration_ms}ms\n' + log_message += f'Input Tokens: {input_tokens}\n' + log_message += f'Output Tokens: {output_tokens}\n' + log_message += f'Total Tokens: {input_tokens + output_tokens}' + + if error_message: + log_message += f'\nError: {error_message}' + + await ap.monitoring_service.record_llm_call( + bot_id=query.bot_uuid or '', + bot_name=workflow_name or 'Workflow', + pipeline_id=workflow_id, + pipeline_name=pipeline_name, + session_id=session_id, + model_name=model_name, + input_tokens=input_tokens, + output_tokens=output_tokens, + duration=duration_ms, + status=status, + error_message=error_message, + ) + + # Also record as message for display + await ap.monitoring_service.record_message( + bot_id=query.bot_uuid or '', + bot_name=workflow_name or 'Workflow', + pipeline_id=workflow_id, + pipeline_name=pipeline_name, + message_content=log_message, + session_id=session_id, + status=status, + level='info', + platform='workflow', + user_id=query.sender_id, + user_name=query.sender_name, + role='system', + ) + except Exception as e: + ap.logger.error(f'Failed to record LLM call log: {e}') + + @staticmethod + async def record_llm_response_log( + ap: app.Application, + query: WorkflowQuery, + workflow_id: str, + workflow_name: str, + node_name: str, + response_content: str, + ): + """Record LLM response log (without LLM info, with response message) + + Format: adapter → workflow_name → local-workflow + Contains: response message content + """ + try: + adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) + session_id = WorkflowMonitoringHelper._get_session_id(query) + + # Build pipeline_name: workflow_name/local-workflow + pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' + + # Build log message + log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' + log_message += f'Node: {node_name}\n' + log_message += f'Response: {response_content[:500]}' # Limit length + + await ap.monitoring_service.record_message( + bot_id=query.bot_uuid or '', + bot_name=workflow_name or 'Workflow', + pipeline_id=workflow_id, + pipeline_name=pipeline_name, + message_content=log_message, + session_id=session_id, + status='success', + level='info', + platform='workflow', + user_id=query.sender_id, + user_name=query.sender_name, + role='assistant', + ) + except Exception as e: + ap.logger.error(f'Failed to record LLM response log: {e}') + + @staticmethod + async def record_reply_log( + ap: app.Application, + query: WorkflowQuery, + workflow_id: str, + workflow_name: str, + node_name: str, + reply_content: str, + ): + """Record reply message log + + Format: adapter → workflow_name → local-workflow + Contains: reply message content + """ + try: + adapter_name = WorkflowMonitoringHelper._get_adapter_name(query) + session_id = WorkflowMonitoringHelper._get_session_id(query) + + # Build pipeline_name: workflow_name/local-workflow + pipeline_name = f'{workflow_name}/local-workflow' if workflow_name else 'local-workflow' + + # Build log message + log_message = f'{adapter_name} → {workflow_name} → local-workflow\n' + log_message += f'Node: {node_name}\n' + log_message += f'Reply: {reply_content[:500]}' # Limit length + + await ap.monitoring_service.record_message( + bot_id=query.bot_uuid or '', + bot_name=workflow_name or 'Workflow', + pipeline_id=workflow_id, + pipeline_name=pipeline_name, + message_content=log_message, + session_id=session_id, + status='success', + level='info', + platform='workflow', + user_id=query.sender_id, + user_name=query.sender_name, + role='assistant', + ) + except Exception as e: + ap.logger.error(f'Failed to record reply log: {e}') + + +class LLMCallMonitor: + """Context manager for monitoring LLM calls in workflow""" + + def __init__( + self, + ap: app.Application, + query: WorkflowQuery, + bot_id: str, + bot_name: str, + workflow_id: str, + workflow_name: str, + node_name: str, + model_name: str, + ): + self.ap = ap + self.query = query + self.bot_id = bot_id + self.bot_name = bot_name + self.workflow_id = workflow_id + self.workflow_name = workflow_name + self.node_name = node_name + self.model_name = model_name + self.start_time = None + self.input_tokens = 0 + self.output_tokens = 0 + + async def __aenter__(self): + self.start_time = time.time() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + duration_ms = int((time.time() - self.start_time) * 1000) if self.start_time else 0 + + if exc_type is not None: + await WorkflowMonitoringHelper.record_llm_call_log( + ap=self.ap, + query=self.query, + workflow_id=self.workflow_id, + workflow_name=self.workflow_name, + node_name=self.node_name, + model_name=self.model_name, + input_tokens=self.input_tokens, + output_tokens=self.output_tokens, + duration_ms=duration_ms, + status='error', + error_message=str(exc_val) if exc_val else None, + ) + else: + await WorkflowMonitoringHelper.record_llm_call_log( + ap=self.ap, + query=self.query, + workflow_id=self.workflow_id, + workflow_name=self.workflow_name, + node_name=self.node_name, + model_name=self.model_name, + input_tokens=self.input_tokens, + output_tokens=self.output_tokens, + duration_ms=duration_ms, + status='success', + ) + + return False diff --git a/src/langbot/pkg/workflow/node.py b/src/langbot/pkg/workflow/node.py index 41daedc9..42fc9bdf 100644 --- a/src/langbot/pkg/workflow/node.py +++ b/src/langbot/pkg/workflow/node.py @@ -142,34 +142,23 @@ class WorkflowNode(abc.ABC): # ------------------------------------------------------------------ -# Decorator and pending registration helpers +# Decorator for setting type_name attribute # ------------------------------------------------------------------ -_pending_registrations: list[tuple[str, type[WorkflowNode]]] = [] - def workflow_node(type_name: str) -> Callable[[type[WorkflowNode]], type[WorkflowNode]]: - """Decorator to register a workflow node type. + """Decorator to set the type_name attribute on a workflow node class. Usage: @workflow_node('llm_call') class LLMCallNode(WorkflowNode): ... + + The actual registration is now handled by the discovery engine. """ def decorator(cls: type[WorkflowNode]) -> type[WorkflowNode]: cls.type_name = type_name - _pending_registrations.append((type_name, cls)) return cls return decorator - - -def get_pending_registrations() -> list[tuple[str, type[WorkflowNode]]]: - """Get pending node registrations""" - return _pending_registrations.copy() - - -def clear_pending_registrations(): - """Clear pending registrations after they're processed""" - _pending_registrations.clear() diff --git a/src/langbot/pkg/workflow/nodes/__init__.py b/src/langbot/pkg/workflow/nodes/__init__.py index 51798995..8b137891 100644 --- a/src/langbot/pkg/workflow/nodes/__init__.py +++ b/src/langbot/pkg/workflow/nodes/__init__.py @@ -1,93 +1 @@ -"""Core workflow nodes package""" -# Import all node modules to trigger registration -# Trigger nodes -from . import message_trigger -from . import cron_trigger -from . import webhook_trigger -from . import event_trigger - -# Process nodes -from . import llm_call -from . import code_executor -from . import http_request -from . import data_transform -from . import question_classifier -from . import parameter_extractor -from . import knowledge_retrieval - -# Control nodes -from . import condition -from . import switch -from . import loop -from . import iterator -from . import parallel -from . import wait -from . import merge -from . import variable_aggregator - -# Action nodes -from . import send_message -from . import reply_message -from . import call_pipeline -from . import call_workflow -from . import store_data -from . import set_variable -from . import opening_statement -from . import end - -# Integration nodes -from . import database_query -from . import redis_operation -from . import mcp_tool -from . import memory_store -from . import dify_workflow -from . import dify_knowledge_query -from . import n8n_workflow -from . import langflow_flow -from . import coze_bot -# from . import plugin_call - -__all__ = [ - # Trigger nodes - 'message_trigger', - 'cron_trigger', - 'webhook_trigger', - 'event_trigger', - # Process nodes - 'llm_call', - 'code_executor', - 'http_request', - 'data_transform', - 'question_classifier', - 'parameter_extractor', - 'knowledge_retrieval', - # Control nodes - 'condition', - 'switch', - 'loop', - 'iterator', - 'parallel', - 'wait', - 'merge', - 'variable_aggregator', - # Action nodes - 'send_message', - 'reply_message', - 'call_pipeline', - 'call_workflow', - 'store_data', - 'set_variable', - 'opening_statement', - 'end', - # Integration nodes - 'database_query', - 'redis_operation', - 'mcp_tool', - 'memory_store', - 'dify_workflow', - 'dify_knowledge_query', - 'n8n_workflow', - 'langflow_flow', - 'coze_bot', -] diff --git a/src/langbot/pkg/workflow/nodes/call_pipeline.py b/src/langbot/pkg/workflow/nodes/call_pipeline.py index 99ea558c..0757ca93 100644 --- a/src/langbot/pkg/workflow/nodes/call_pipeline.py +++ b/src/langbot/pkg/workflow/nodes/call_pipeline.py @@ -17,7 +17,7 @@ import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.provider.session as provider_session -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @@ -154,6 +154,7 @@ class CallPipelineNode(WorkflowNode): if context.message_context and context.message_context.is_group: group = platform_entities.Group( id=context.message_context.group_id or context.session_id or 'workflow_group', + name=context.message_context.raw_message.get('group_name', 'Workflow Group') if context.message_context.raw_message else 'Workflow Group', permission=platform_entities.Permission.Member, ) sender = platform_entities.GroupMember( diff --git a/src/langbot/pkg/workflow/nodes/call_workflow.py b/src/langbot/pkg/workflow/nodes/call_workflow.py index e3c21f9f..d0569043 100644 --- a/src/langbot/pkg/workflow/nodes/call_workflow.py +++ b/src/langbot/pkg/workflow/nodes/call_workflow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any, Optional -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node diff --git a/src/langbot/pkg/workflow/nodes/code_executor.py b/src/langbot/pkg/workflow/nodes/code_executor.py index 82cbd1f4..84b89e74 100644 --- a/src/langbot/pkg/workflow/nodes/code_executor.py +++ b/src/langbot/pkg/workflow/nodes/code_executor.py @@ -12,7 +12,7 @@ import sys import threading from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) diff --git a/src/langbot/pkg/workflow/nodes/condition.py b/src/langbot/pkg/workflow/nodes/condition.py index f403a0eb..e9158e1a 100644 --- a/src/langbot/pkg/workflow/nodes/condition.py +++ b/src/langbot/pkg/workflow/nodes/condition.py @@ -10,7 +10,7 @@ import re import signal from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node from ..safe_eval import safe_eval_with_vars diff --git a/src/langbot/pkg/workflow/nodes/coze_bot.py b/src/langbot/pkg/workflow/nodes/coze_bot.py index 715a2c47..ef7d4708 100644 --- a/src/langbot/pkg/workflow/nodes/coze_bot.py +++ b/src/langbot/pkg/workflow/nodes/coze_bot.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('coze_bot') diff --git a/src/langbot/pkg/workflow/nodes/cron_trigger.py b/src/langbot/pkg/workflow/nodes/cron_trigger.py index 13c76639..637ff137 100644 --- a/src/langbot/pkg/workflow/nodes/cron_trigger.py +++ b/src/langbot/pkg/workflow/nodes/cron_trigger.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('cron_trigger') diff --git a/src/langbot/pkg/workflow/nodes/data_transform.py b/src/langbot/pkg/workflow/nodes/data_transform.py index 7445662c..509739c7 100644 --- a/src/langbot/pkg/workflow/nodes/data_transform.py +++ b/src/langbot/pkg/workflow/nodes/data_transform.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node from ..safe_eval import safe_eval_with_vars diff --git a/src/langbot/pkg/workflow/nodes/database_query.py b/src/langbot/pkg/workflow/nodes/database_query.py index 65f96bb1..a5a8aa9a 100644 --- a/src/langbot/pkg/workflow/nodes/database_query.py +++ b/src/langbot/pkg/workflow/nodes/database_query.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('database_query') diff --git a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py index 20e534cb..c06ac63e 100644 --- a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py +++ b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('dify_knowledge_query') diff --git a/src/langbot/pkg/workflow/nodes/dify_workflow.py b/src/langbot/pkg/workflow/nodes/dify_workflow.py index df8a2c4b..ad69cb37 100644 --- a/src/langbot/pkg/workflow/nodes/dify_workflow.py +++ b/src/langbot/pkg/workflow/nodes/dify_workflow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('dify_workflow') diff --git a/src/langbot/pkg/workflow/nodes/end.py b/src/langbot/pkg/workflow/nodes/end.py index 879023a3..7498dc5a 100644 --- a/src/langbot/pkg/workflow/nodes/end.py +++ b/src/langbot/pkg/workflow/nodes/end.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('end') diff --git a/src/langbot/pkg/workflow/nodes/event_trigger.py b/src/langbot/pkg/workflow/nodes/event_trigger.py index 96d5fae1..e526077e 100644 --- a/src/langbot/pkg/workflow/nodes/event_trigger.py +++ b/src/langbot/pkg/workflow/nodes/event_trigger.py @@ -8,7 +8,7 @@ from __future__ import annotations from datetime import datetime from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('event_trigger') diff --git a/src/langbot/pkg/workflow/nodes/http_request.py b/src/langbot/pkg/workflow/nodes/http_request.py index 0bfde443..409c6e3c 100644 --- a/src/langbot/pkg/workflow/nodes/http_request.py +++ b/src/langbot/pkg/workflow/nodes/http_request.py @@ -10,7 +10,7 @@ import logging from typing import Any from urllib.parse import urlparse -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) diff --git a/src/langbot/pkg/workflow/nodes/iterator.py b/src/langbot/pkg/workflow/nodes/iterator.py index fa33f04e..e3ac5198 100644 --- a/src/langbot/pkg/workflow/nodes/iterator.py +++ b/src/langbot/pkg/workflow/nodes/iterator.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('iterator') diff --git a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py index 46eafeb1..0ac2f776 100644 --- a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py +++ b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('knowledge_retrieval') diff --git a/src/langbot/pkg/workflow/nodes/langflow_flow.py b/src/langbot/pkg/workflow/nodes/langflow_flow.py index 340e10ed..e64d5ba6 100644 --- a/src/langbot/pkg/workflow/nodes/langflow_flow.py +++ b/src/langbot/pkg/workflow/nodes/langflow_flow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('langflow_flow') diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py index 512b34de..571abd2b 100644 --- a/src/langbot/pkg/workflow/nodes/llm_call.py +++ b/src/langbot/pkg/workflow/nodes/llm_call.py @@ -12,13 +12,15 @@ from __future__ import annotations import json import logging import re +import time from typing import Any, AsyncGenerator import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.rag.context as rag_context -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node +from .. import monitoring_helper logger = logging.getLogger(__name__) @@ -383,7 +385,17 @@ Respond in the same language as the user's input. context.variables['_conversation_history'] = history async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - model_uuid = self.get_config('model', '') + # Support both new model_config format and legacy model + fallback_models format + model_config = self.get_config('model_config', None) + if model_config and isinstance(model_config, dict): + # New format: {primary: uuid, fallbacks: [uuid1, uuid2, ...]} + model_uuid = model_config.get('primary', '') + fallback_models = model_config.get('fallbacks', []) + else: + # Legacy format: separate model and fallback_models + model_uuid = self.get_config('model', '') + fallback_models = self.get_config('fallback_models', []) + if not model_uuid: raise ValueError('No model configured for LLM call node') @@ -399,8 +411,8 @@ Respond in the same language as the user's input. output_format = self.get_config('output_format', 'text') json_schema = self.get_config('json_schema', '') - # Agent config: fallback models, knowledge bases, rerank, max_round - fallback_models = self.get_config('fallback_models', []) + # Agent config: knowledge bases, rerank, max_round + # (fallback_models already resolved above from model_config or fallback_models) knowledge_bases = self.get_config('knowledge_bases', []) rerank_model = self.get_config('rerank_model', '') rerank_top_k = self.get_config('rerank_top_k', 5) @@ -493,6 +505,9 @@ Respond in the same language as the user's input. if max_tokens and int(max_tokens) > 0: extra_args['max_tokens'] = int(max_tokens) + # Track start time for duration calculation + self._llm_start_time = time.time() + # Invoke LLM with fallback try: result_message, used_model = await self._invoke_with_fallback( @@ -563,18 +578,81 @@ Respond in the same language as the user's input. # Extract usage info if hasattr(result_message, 'usage') and result_message.usage: u = result_message.usage - usage = { - 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, - 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, - 'total_tokens': getattr(u, 'total_tokens', 0) or 0, - } + # Handle both object and dict usage + if isinstance(u, dict): + usage = { + 'prompt_tokens': u.get('prompt_tokens', 0) or 0, + 'completion_tokens': u.get('completion_tokens', 0) or 0, + 'total_tokens': u.get('total_tokens', 0) or 0, + } + else: + usage = { + 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, + 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, + 'total_tokens': getattr(u, 'total_tokens', 0) or 0, + } elif hasattr(result_message, 'token_usage') and result_message.token_usage: u = result_message.token_usage - usage = { - 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, - 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, - 'total_tokens': getattr(u, 'total_tokens', 0) or 0, - } + # Handle both object and dict token_usage + if isinstance(u, dict): + usage = { + 'prompt_tokens': u.get('prompt_tokens', 0) or 0, + 'completion_tokens': u.get('completion_tokens', 0) or 0, + 'total_tokens': u.get('total_tokens', 0) or 0, + } + else: + usage = { + 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, + 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, + 'total_tokens': getattr(u, 'total_tokens', 0) or 0, + } + + # Log successful response (matching Pipeline's cut_str behavior) + def _cut_str(s: str) -> str: + s0 = s.split('\n')[0] + if len(s0) > 20 or '\n' in s: + s0 = s0[:20] + '...' + return s0 + logger.info(f'[LLM:{self.node_id}] Response: {_cut_str(response_text)}') + + # Record LLM call log and response log + try: + if self.ap and context.query: + workflow_id = context.workflow_id or '' + workflow_name = context.variables.get('_workflow_name', 'Workflow') + node_name = self.get_config('name', self.node_id) + model_name = used_model.model_entity.name if used_model else 'unknown' + + # Calculate duration + duration_ms = 0 + if hasattr(self, '_llm_start_time'): + duration_ms = int((time.time() - self._llm_start_time) * 1000) + + # Record LLM call log (with LLM info) + await monitoring_helper.WorkflowMonitoringHelper.record_llm_call_log( + ap=self.ap, + query=context.query, + workflow_id=workflow_id, + workflow_name=workflow_name, + node_name=node_name, + model_name=model_name, + input_tokens=usage.get('prompt_tokens', 0), + output_tokens=usage.get('completion_tokens', 0), + duration_ms=duration_ms, + status='success', + ) + + # Record LLM response log (with response message) + await monitoring_helper.WorkflowMonitoringHelper.record_llm_response_log( + ap=self.ap, + query=context.query, + workflow_id=workflow_id, + workflow_name=workflow_name, + node_name=node_name, + response_content=response_text, + ) + except Exception as e: + logger.warning(f'[LLM:{self.node_id}] Failed to record LLM logs: {e}') # Save to conversation history self._save_to_conversation_history( @@ -615,7 +693,13 @@ Respond in the same language as the user's input. Yields chunks of response text as they arrive. Falls back to non-streaming if streaming is not available. """ - model_uuid = self.get_config('model', '') + # Support both new model_config format and legacy model + fallback_models format + model_config = self.get_config('model_config', None) + if model_config and isinstance(model_config, dict): + model_uuid = model_config.get('primary', '') + else: + model_uuid = self.get_config('model', '') + if not model_uuid: raise ValueError('No model configured for LLM call node') diff --git a/src/langbot/pkg/workflow/nodes/loop.py b/src/langbot/pkg/workflow/nodes/loop.py index 18cfcb2f..090059a8 100644 --- a/src/langbot/pkg/workflow/nodes/loop.py +++ b/src/langbot/pkg/workflow/nodes/loop.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('loop') diff --git a/src/langbot/pkg/workflow/nodes/mcp_tool.py b/src/langbot/pkg/workflow/nodes/mcp_tool.py index ee4e3547..2039d17b 100644 --- a/src/langbot/pkg/workflow/nodes/mcp_tool.py +++ b/src/langbot/pkg/workflow/nodes/mcp_tool.py @@ -11,7 +11,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('mcp_tool') diff --git a/src/langbot/pkg/workflow/nodes/memory_store.py b/src/langbot/pkg/workflow/nodes/memory_store.py index 8cc2b110..d0744f5e 100644 --- a/src/langbot/pkg/workflow/nodes/memory_store.py +++ b/src/langbot/pkg/workflow/nodes/memory_store.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node class MemoryHelper: diff --git a/src/langbot/pkg/workflow/nodes/merge.py b/src/langbot/pkg/workflow/nodes/merge.py index 483f3701..5bcc9b20 100644 --- a/src/langbot/pkg/workflow/nodes/merge.py +++ b/src/langbot/pkg/workflow/nodes/merge.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('merge') diff --git a/src/langbot/pkg/workflow/nodes/message_trigger.py b/src/langbot/pkg/workflow/nodes/message_trigger.py index c4b45071..7d2ce6ff 100644 --- a/src/langbot/pkg/workflow/nodes/message_trigger.py +++ b/src/langbot/pkg/workflow/nodes/message_trigger.py @@ -7,10 +7,15 @@ Node metadata (label, description, inputs, outputs, config) is loaded from: from __future__ import annotations +import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node +from .. import monitoring_helper + +logger = logging.getLogger(__name__) + @workflow_node('message_trigger') class MessageTriggerNode(WorkflowNode): @@ -21,6 +26,20 @@ class MessageTriggerNode(WorkflowNode): async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: msg_ctx = context.message_context + # Record trigger log + try: + if self.ap and context.query: + workflow_id = context.workflow_id or '' + workflow_name = context.variables.get('_workflow_name', 'Workflow') + await monitoring_helper.WorkflowMonitoringHelper.record_trigger_log( + ap=self.ap, + query=context.query, + workflow_id=workflow_id, + workflow_name=workflow_name, + ) + except Exception as e: + logger.warning(f'[MessageTrigger:{self.node_id}] Failed to record trigger log: {e}') + if msg_ctx: return { 'message': msg_ctx.message_content, diff --git a/src/langbot/pkg/workflow/nodes/n8n_workflow.py b/src/langbot/pkg/workflow/nodes/n8n_workflow.py index 2137b192..f2da5470 100644 --- a/src/langbot/pkg/workflow/nodes/n8n_workflow.py +++ b/src/langbot/pkg/workflow/nodes/n8n_workflow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('n8n_workflow') diff --git a/src/langbot/pkg/workflow/nodes/opening_statement.py b/src/langbot/pkg/workflow/nodes/opening_statement.py index 20e50008..a5e46130 100644 --- a/src/langbot/pkg/workflow/nodes/opening_statement.py +++ b/src/langbot/pkg/workflow/nodes/opening_statement.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('opening_statement') diff --git a/src/langbot/pkg/workflow/nodes/parallel.py b/src/langbot/pkg/workflow/nodes/parallel.py index 0cb2eeec..89b9ff65 100644 --- a/src/langbot/pkg/workflow/nodes/parallel.py +++ b/src/langbot/pkg/workflow/nodes/parallel.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('parallel') diff --git a/src/langbot/pkg/workflow/nodes/parameter_extractor.py b/src/langbot/pkg/workflow/nodes/parameter_extractor.py index 2f7cca72..5221b951 100644 --- a/src/langbot/pkg/workflow/nodes/parameter_extractor.py +++ b/src/langbot/pkg/workflow/nodes/parameter_extractor.py @@ -9,7 +9,7 @@ import json import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) @@ -88,6 +88,22 @@ class ParameterExtractorNode(WorkflowNode): extra_args={}, ) + # Log successful response (matching Pipeline's cut_str behavior) + response_preview = '' + if isinstance(result_message.content, str): + response_preview = result_message.content + elif isinstance(result_message.content, list): + for elem in result_message.content: + if hasattr(elem, 'text') and elem.text: + response_preview += elem.text + response_preview = response_preview.strip() + def _cut_str(s: str) -> str: + s0 = s.split('\n')[0] + if len(s0) > 20 or '\n' in s: + s0 = s0[:20] + '...' + return s0 + logger.info(f'[ParameterExtractor:{self.node_id}] Response: {_cut_str(response_preview)}') + # Extract response text response_text = '' if isinstance(result_message.content, str): diff --git a/src/langbot/pkg/workflow/nodes/plugin_call.py b/src/langbot/pkg/workflow/nodes/plugin_call.py index 428f333a..3ae93c58 100644 --- a/src/langbot/pkg/workflow/nodes/plugin_call.py +++ b/src/langbot/pkg/workflow/nodes/plugin_call.py @@ -7,7 +7,7 @@ # from typing import Any -# from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +# from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext # from ..node import WorkflowNode, workflow_node # @workflow_node('plugin_call') diff --git a/src/langbot/pkg/workflow/nodes/question_classifier.py b/src/langbot/pkg/workflow/nodes/question_classifier.py index df561ff9..f643573a 100644 --- a/src/langbot/pkg/workflow/nodes/question_classifier.py +++ b/src/langbot/pkg/workflow/nodes/question_classifier.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) @@ -78,6 +78,22 @@ class QuestionClassifierNode(WorkflowNode): extra_args={}, ) + # Log successful response (matching Pipeline's cut_str behavior) + response_preview = '' + if isinstance(result_message.content, str): + response_preview = result_message.content + elif isinstance(result_message.content, list): + for elem in result_message.content: + if hasattr(elem, 'text') and elem.text: + response_preview += elem.text + response_preview = response_preview.strip() + def _cut_str(s: str) -> str: + s0 = s.split('\n')[0] + if len(s0) > 20 or '\n' in s: + s0 = s0[:20] + '...' + return s0 + logger.info(f'[QuestionClassifier:{self.node_id}] Response: {_cut_str(response_preview)}') + # Extract response text response_text = '' if isinstance(result_message.content, str): diff --git a/src/langbot/pkg/workflow/nodes/redis_operation.py b/src/langbot/pkg/workflow/nodes/redis_operation.py index 6e4b4eab..426f1724 100644 --- a/src/langbot/pkg/workflow/nodes/redis_operation.py +++ b/src/langbot/pkg/workflow/nodes/redis_operation.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('redis_operation') diff --git a/src/langbot/pkg/workflow/nodes/reply_message.py b/src/langbot/pkg/workflow/nodes/reply_message.py index d168531b..af464bb3 100644 --- a/src/langbot/pkg/workflow/nodes/reply_message.py +++ b/src/langbot/pkg/workflow/nodes/reply_message.py @@ -8,8 +8,9 @@ from __future__ import annotations import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node +from .. import monitoring_helper logger = logging.getLogger(__name__) @@ -80,7 +81,10 @@ class ReplyMessageNode(WorkflowNode): from langbot_plugin.api.entities.builtin.platform.message import MessageChain, Plain message_chain = MessageChain([Plain(text=message_str)]) - target_type = getattr(context, 'target_type', 'person') or 'person' + # 从 trigger_data 中获取 session_type,而不是从未设置的 context.target_type + target_type = 'person' + if context.trigger_data: + target_type = context.trigger_data.get('session_type', 'person') or 'person' session_id = context.session_id or 'unknown' target_id = f'websocket_{session_id}' @@ -103,8 +107,26 @@ class ReplyMessageNode(WorkflowNode): }, ) + # Record reply log + try: + if self.ap and context.query and send_success: + workflow_id = context.workflow_id or '' + workflow_name = context.variables.get('_workflow_name', 'Workflow') + node_name = self.get_config('name', self.node_id) + await monitoring_helper.WorkflowMonitoringHelper.record_reply_log( + ap=self.ap, + query=context.query, + workflow_id=workflow_id, + workflow_name=workflow_name, + node_name=node_name, + reply_content=message_str, + ) + except Exception as e: + logger.warning(f'[ReplyMessage:{self.node_id}] Failed to record reply log: {e}') + return { 'status': 'sent' if send_success else 'failed', + 'message_content': message_str, 'message_preview': message_str[:200], 'error': send_error, } diff --git a/src/langbot/pkg/workflow/nodes/send_message.py b/src/langbot/pkg/workflow/nodes/send_message.py index 8fe7bdf4..cd6ca46e 100644 --- a/src/langbot/pkg/workflow/nodes/send_message.py +++ b/src/langbot/pkg/workflow/nodes/send_message.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) @@ -24,8 +24,15 @@ class SendMessageNode(WorkflowNode): message = inputs.get('message') or inputs.get('content') or inputs.get('input') or '' message = str(message) if message is not None else '' - # Get target configuration - target_type = self.get_config('target_type', 'person') + # Get target configuration - fallback to session_type from context if not configured + target_type = self.get_config('target_type', '') + if not target_type: + # Inherit from current session context + if context.trigger_data: + target_type = context.trigger_data.get('session_type', 'person') or 'person' + else: + target_type = 'person' + target_id = self.get_config('target_id', '') # If no target_id configured, use session_id from context @@ -64,6 +71,7 @@ class SendMessageNode(WorkflowNode): return { 'status': 'sent' if send_success else 'failed', + 'message_content': message, 'message_preview': message[:200], 'target_type': target_type, 'target_id': target_id, diff --git a/src/langbot/pkg/workflow/nodes/set_variable.py b/src/langbot/pkg/workflow/nodes/set_variable.py index bee36e36..70d59a56 100644 --- a/src/langbot/pkg/workflow/nodes/set_variable.py +++ b/src/langbot/pkg/workflow/nodes/set_variable.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('set_variable') diff --git a/src/langbot/pkg/workflow/nodes/store_data.py b/src/langbot/pkg/workflow/nodes/store_data.py index 7eb20e0b..7edaebab 100644 --- a/src/langbot/pkg/workflow/nodes/store_data.py +++ b/src/langbot/pkg/workflow/nodes/store_data.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('store_data') diff --git a/src/langbot/pkg/workflow/nodes/switch.py b/src/langbot/pkg/workflow/nodes/switch.py index 8d196647..3b1f4a62 100644 --- a/src/langbot/pkg/workflow/nodes/switch.py +++ b/src/langbot/pkg/workflow/nodes/switch.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('switch') diff --git a/src/langbot/pkg/workflow/nodes/variable_aggregator.py b/src/langbot/pkg/workflow/nodes/variable_aggregator.py index e9cfbce8..9108ea97 100644 --- a/src/langbot/pkg/workflow/nodes/variable_aggregator.py +++ b/src/langbot/pkg/workflow/nodes/variable_aggregator.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('variable_aggregator') diff --git a/src/langbot/pkg/workflow/nodes/wait.py b/src/langbot/pkg/workflow/nodes/wait.py index 57b9bc3d..f075b045 100644 --- a/src/langbot/pkg/workflow/nodes/wait.py +++ b/src/langbot/pkg/workflow/nodes/wait.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) diff --git a/src/langbot/pkg/workflow/nodes/webhook_trigger.py b/src/langbot/pkg/workflow/nodes/webhook_trigger.py index e7b6eca6..27d41300 100644 --- a/src/langbot/pkg/workflow/nodes/webhook_trigger.py +++ b/src/langbot/pkg/workflow/nodes/webhook_trigger.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from langbot_plugin.api.entities.builtin.workflow import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow.entities import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('webhook_trigger') diff --git a/src/langbot/pkg/workflow/registry.py b/src/langbot/pkg/workflow/registry.py index 37fae180..4c4c4290 100644 --- a/src/langbot/pkg/workflow/registry.py +++ b/src/langbot/pkg/workflow/registry.py @@ -4,10 +4,13 @@ from __future__ import annotations import copy import logging -from typing import Any, Optional +from typing import Any, Optional, TYPE_CHECKING from .metadata import build_node_type -from .node import WorkflowNode, clear_pending_registrations, get_pending_registrations +from .node import WorkflowNode + +if TYPE_CHECKING: + from langbot.pkg.discover.engine import ComponentDiscoveryEngine logger = logging.getLogger(__name__) @@ -116,13 +119,6 @@ class NodeTypeRegistry: canonical_type = self._resolve_registered_node_key(node_type) if canonical_type: return self._nodes[canonical_type] - - if get_pending_registrations(): - self.process_pending_registrations() - canonical_type = self._resolve_registered_node_key(node_type) - if canonical_type: - return self._nodes[canonical_type] - return None def get_metadata(self, node_type: str) -> Optional[dict[str, Any]]: @@ -203,11 +199,25 @@ class NodeTypeRegistry: """Check whether a node has metadata or an implementation registered.""" return self.get_metadata(node_type) is not None or self.get(node_type) is not None - def process_pending_registrations(self): - """Process all pending node registrations from decorators.""" - for node_type, node_class in get_pending_registrations(): - self.register(node_type, node_class) - clear_pending_registrations() + def discover_nodes(self, discover_engine: 'ComponentDiscoveryEngine', nodes_dir: str = 'pkg/workflow/nodes/'): + """Discover and register workflow nodes from the discovery engine. + + This method uses the ComponentDiscoveryEngine to find all WorkflowNode + subclasses in the specified directory and registers them automatically, + replacing the old decorator-based registration mechanism. + + Args: + discover_engine: The ComponentDiscoveryEngine instance + nodes_dir: Directory path to scan for workflow nodes + """ + node_classes = discover_engine.discover_workflow_nodes(nodes_dir) + for node_class in node_classes: + type_name = getattr(node_class, 'type_name', '') + if type_name: + self.register(type_name, node_class) + logger.debug(f'Auto-registered workflow node: {type_name}') + else: + logger.warning(f'Workflow node class {node_class.__name__} missing type_name attribute') def count(self) -> int: """Get total number of node types exposed by metadata or implementation.""" diff --git a/src/langbot/templates/components.yaml b/src/langbot/templates/components.yaml index a95235aa..451b7f55 100644 --- a/src/langbot/templates/components.yaml +++ b/src/langbot/templates/components.yaml @@ -13,3 +13,6 @@ spec: LLMAPIRequester: fromDirs: - path: pkg/provider/modelmgr/requesters/ + WorkflowNode: + fromDirs: + - path: pkg/workflow/nodes/ diff --git a/src/langbot/templates/metadata/nodes/llm_call.yaml b/src/langbot/templates/metadata/nodes/llm_call.yaml index 06872f0b..2996d4b0 100644 --- a/src/langbot/templates/metadata/nodes/llm_call.yaml +++ b/src/langbot/templates/metadata/nodes/llm_call.yaml @@ -60,26 +60,18 @@ outputs: zh_Hans: 解析后的输出(如果输出格式为 JSON) config: - - name: model - type: llm-model-selector - required: true - label: - en_US: Model - zh_Hans: 模型 - description: - en_US: Select the LLM model to use - zh_Hans: 选择要使用的 LLM 模型 - - - name: fallback_models + - name: model_config type: model-fallback-selector - required: false - default: [] + required: true + default: + primary: '' + fallbacks: [] label: - en_US: Fallback Models - zh_Hans: 备用模型 + en_US: Model Configuration + zh_Hans: 模型配置 description: - en_US: List of fallback models to try if the primary model fails - zh_Hans: 主模型失败时尝试的备用模型列表 + en_US: Configure the primary model and optional fallback models + zh_Hans: 配置主模型和可选的备用模型 - name: prompt label: diff --git a/tests/unit_tests/pipeline/test_config_coercion.py b/tests/unit_tests/pipeline/test_config.py similarity index 97% rename from tests/unit_tests/pipeline/test_config_coercion.py rename to tests/unit_tests/pipeline/test_config.py index a23f54de..a49def56 100644 --- a/tests/unit_tests/pipeline/test_config_coercion.py +++ b/tests/unit_tests/pipeline/test_config.py @@ -1,10 +1,10 @@ -"""Unit tests for config_coercion module""" +"""Unit tests for config module""" from __future__ import annotations import pytest -from langbot.pkg.pipeline.config_coercion import _coerce_value, coerce_pipeline_config +from langbot.pkg.pipeline.config import _coerce_value, coerce_pipeline_config class TestCoerceValue: diff --git a/web/src/app/home/workflows/components/workflow-editor/WorkflowEditorComponent.tsx b/web/src/app/home/workflows/components/workflow-editor/WorkflowEditorComponent.tsx index 69b0991c..ebf4c18c 100644 --- a/web/src/app/home/workflows/components/workflow-editor/WorkflowEditorComponent.tsx +++ b/web/src/app/home/workflows/components/workflow-editor/WorkflowEditorComponent.tsx @@ -1,4 +1,4 @@ -import { useCallback, useRef, useState } from 'react'; +import { useCallback, useRef, useState, useEffect } from 'react'; import { ReactFlow, Background, @@ -425,7 +425,16 @@ function WorkflowEditorInner() { {/* Center: Flow Canvas */} -