From 253cc6cbea3c7d6518d6135bcc4813469d3f0be2 Mon Sep 17 00:00:00 2001 From: Typer_Body Date: Fri, 22 May 2026 02:07:48 +0800 Subject: [PATCH] shit --- .../controller/groups/workflows/workflows.py | 158 ++++++++++++++ src/langbot/pkg/workflow/adapters.py | 204 ++++++++++++++++++ src/langbot/pkg/workflow/entities.py | 166 ++------------ src/langbot/pkg/workflow/executor.py | 13 +- .../pkg/workflow/nodes/call_pipeline.py | 55 ++++- .../pkg/workflow/nodes/code_executor.py | 2 +- src/langbot/pkg/workflow/nodes/condition.py | 2 +- src/langbot/pkg/workflow/nodes/coze_bot.py | 2 +- .../pkg/workflow/nodes/cron_trigger.py | 2 +- .../pkg/workflow/nodes/data_transform.py | 2 +- .../pkg/workflow/nodes/database_query.py | 2 +- .../workflow/nodes/dify_knowledge_query.py | 2 +- .../pkg/workflow/nodes/dify_workflow.py | 2 +- src/langbot/pkg/workflow/nodes/end.py | 2 +- .../pkg/workflow/nodes/event_trigger.py | 2 +- .../pkg/workflow/nodes/http_request.py | 2 +- src/langbot/pkg/workflow/nodes/iterator.py | 2 +- .../pkg/workflow/nodes/knowledge_retrieval.py | 2 +- .../pkg/workflow/nodes/langflow_flow.py | 2 +- src/langbot/pkg/workflow/nodes/llm_call.py | 56 ++--- src/langbot/pkg/workflow/nodes/loop.py | 2 +- src/langbot/pkg/workflow/nodes/mcp_tool.py | 2 +- .../pkg/workflow/nodes/memory_store.py | 2 +- src/langbot/pkg/workflow/nodes/merge.py | 2 +- .../pkg/workflow/nodes/message_trigger.py | 2 +- .../pkg/workflow/nodes/n8n_workflow.py | 2 +- .../pkg/workflow/nodes/opening_statement.py | 2 +- src/langbot/pkg/workflow/nodes/parallel.py | 2 +- .../pkg/workflow/nodes/parameter_extractor.py | 2 +- src/langbot/pkg/workflow/nodes/plugin_call.py | 2 +- .../pkg/workflow/nodes/question_classifier.py | 2 +- .../pkg/workflow/nodes/redis_operation.py | 2 +- .../pkg/workflow/nodes/reply_message.py | 11 +- .../pkg/workflow/nodes/send_message.py | 2 +- .../pkg/workflow/nodes/set_variable.py | 2 +- src/langbot/pkg/workflow/nodes/store_data.py | 2 +- src/langbot/pkg/workflow/nodes/switch.py | 2 +- .../pkg/workflow/nodes/variable_aggregator.py | 2 +- src/langbot/pkg/workflow/nodes/wait.py | 2 +- .../pkg/workflow/nodes/webhook_trigger.py | 2 +- src/langbot/pkg/workflow/variable_manager.py | 185 ++++++++++++++++ uv.lock | 50 +++-- 42 files changed, 727 insertions(+), 237 deletions(-) create mode 100644 src/langbot/pkg/workflow/adapters.py create mode 100644 src/langbot/pkg/workflow/variable_manager.py diff --git a/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py b/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py index d989d45e..ccce2a87 100644 --- a/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py +++ b/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py @@ -312,6 +312,164 @@ class WorkflowsRouterGroup(group.RouterGroup): except ValueError as e: return self.http_status(404, -1, str(e)) + # LLM Node Performance Test Endpoint + # Tests each step of LLM node execution with detailed timing + @self.route('/_/test/llm-node', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY) + async def _() -> str: + """Test LLM node performance with detailed step-by-step timing. + + Request body: + { + "model_uuid": "uuid-of-model", + "system_prompt": "optional system prompt", + "user_prompt": "test message", + "enable_tools": false, + "tools": [], + "temperature": 0.7, + "max_tokens": 100 + } + + Response includes timing for each step: + - model_fetch: Time to get model from model_mgr + - tool_fetch: Time to load tools (if enabled) + - prompt_build: Time to build messages + - llm_call: Time for actual LLM invocation + - total: Total time + """ + import time + + json_data = await quart.request.json + if not json_data: + return self.http_status(400, -1, 'Request body is required') + + model_uuid = json_data.get('model_uuid', '') + if not model_uuid: + return self.http_status(400, -1, 'model_uuid is required') + + user_prompt = json_data.get('user_prompt', 'test') + system_prompt = json_data.get('system_prompt', '') + enable_tools = json_data.get('enable_tools', False) + tools_config = json_data.get('tools', []) + temperature = json_data.get('temperature') + max_tokens = json_data.get('max_tokens', 0) + + timings = {} + errors = [] + + # Step 1: Model fetch + t_start = time.perf_counter() + try: + runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_uuid) + timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + timings['model_found'] = True + timings['model_name'] = runtime_model.model_entity.name if runtime_model else None + except Exception as e: + timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + timings['model_found'] = False + errors.append(f'Model fetch failed: {str(e)}') + return self.http_status(400, -1, { + 'error': errors[0], + 'timings': timings, + }) + + # Step 2: Tool fetch (if enabled) + timings['tool_fetch_ms'] = 0 + timings['tools_loaded'] = 0 + if enable_tools and tools_config: + t_start = time.perf_counter() + try: + import langbot_plugin.api.entities.builtin.resource.tool as resource_tool + all_tools = await self.ap.tool_mgr.get_tools() + tool_names = tools_config if isinstance(tools_config, list) else [] + funcs = [t for t in all_tools if t.name in tool_names] + timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + timings['tools_loaded'] = len(funcs) + timings['tool_names'] = [t.name for t in funcs] + except Exception as e: + timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + errors.append(f'Tool fetch failed: {str(e)}') + + # Step 3: Build messages + t_start = time.perf_counter() + import langbot_plugin.api.entities.builtin.provider.message as provider_message + messages = [] + if system_prompt: + messages.append(provider_message.Message(role='system', content=system_prompt)) + messages.append(provider_message.Message(role='user', content=user_prompt)) + timings['prompt_build_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + + # Step 4: Build extra args + extra_args = {} + if temperature is not None: + extra_args['temperature'] = float(temperature) + if max_tokens and int(max_tokens) > 0: + extra_args['max_tokens'] = int(max_tokens) + + # Step 5: LLM call + t_start = time.perf_counter() + try: + result_message = await runtime_model.provider.invoke_llm( + query=None, + model=runtime_model, + messages=messages, + funcs=funcs if enable_tools else None, + extra_args=extra_args, + ) + timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + timings['llm_call_success'] = True + + # Extract response text + response_text = '' + if isinstance(result_message.content, str): + response_text = result_message.content + elif isinstance(result_message.content, list): + for elem in result_message.content: + if hasattr(elem, 'text') and elem.text: + response_text += elem.text + elif isinstance(elem, str): + response_text += elem + + timings['response_length'] = len(response_text) + timings['response_preview'] = response_text[:200] + + # Extract usage + usage = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0} + if hasattr(result_message, 'usage') and result_message.usage: + u = result_message.usage + usage = { + 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, + 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, + 'total_tokens': getattr(u, 'total_tokens', 0) or 0, + } + timings['usage'] = usage + + except Exception as e: + timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2) + timings['llm_call_success'] = False + errors.append(f'LLM call failed: {str(e)}') + + # Calculate total + timings['total_ms'] = round(sum([ + timings.get('model_fetch_ms', 0), + timings.get('tool_fetch_ms', 0), + timings.get('prompt_build_ms', 0), + timings.get('llm_call_ms', 0), + ]), 2) + + # Add breakdown percentage + if timings['total_ms'] > 0: + timings['breakdown'] = { + 'model_fetch_pct': round(timings.get('model_fetch_ms', 0) / timings['total_ms'] * 100, 1), + 'tool_fetch_pct': round(timings.get('tool_fetch_ms', 0) / timings['total_ms'] * 100, 1), + 'prompt_build_pct': round(timings.get('prompt_build_ms', 0) / timings['total_ms'] * 100, 1), + 'llm_call_pct': round(timings.get('llm_call_ms', 0) / timings['total_ms'] * 100, 1), + } + + if errors: + timings['errors'] = errors + + return self.success(data={'test_result': timings}) + @group.group_class('executions', '/api/v1/executions') class ExecutionsRouterGroup(group.RouterGroup): diff --git a/src/langbot/pkg/workflow/adapters.py b/src/langbot/pkg/workflow/adapters.py new file mode 100644 index 00000000..17530a4f --- /dev/null +++ b/src/langbot/pkg/workflow/adapters.py @@ -0,0 +1,204 @@ +"""Workflow-Pipeline通信适配器 + +这个模块提供了Workflow和Pipeline之间的通信适配,使用SDK标准的MessageEnvelope格式。 +""" + +from __future__ import annotations + +import logging +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +class _WorkflowPipelineCaptureAdapter: + """Workflow-Pipeline通信适配器 + + 用于在Workflow节点和Pipeline之间进行标准化的消息传递。 + 支持MessageEnvelope格式的双向转换。 + """ + + def __init__(self, context: Any): + """初始化适配器 + + Args: + context: ExecutionContext - Workflow执行上下文 + """ + self.context = context + self.responses: list[dict[str, Any]] = [] + self.bot_account_id: Optional[str] = None + self._logger = logging.getLogger(__name__) + + async def call_pipeline_with_envelope( + self, + envelope: Any, + pipeline_executor: Any + ) -> Any: + """使用MessageEnvelope调用Pipeline + + Args: + envelope: MessageEnvelope - 标准消息信封 + pipeline_executor: Pipeline执行器实例 + + Returns: + MessageEnvelope - 执行结果信封 + """ + try: + # 动态导入以避免循环依赖 + from langbot_plugin_sdk.workflow import envelope_to_query, query_to_envelope + + # 1. 转换为Query + query = envelope_to_query(envelope) + + # 2. 调用Pipeline + result_query = await pipeline_executor.execute(query) + + # 3. 转换回Envelope + result_envelope = query_to_envelope(result_query, envelope) + + self._logger.debug( + f'Pipeline execution completed for workflow {envelope.workflow_id}', + extra={ + 'workflow_id': envelope.workflow_id, + 'execution_id': envelope.execution_id, + 'node_id': envelope.node_id, + } + ) + + return result_envelope + + except Exception as e: + self._logger.error( + f'Pipeline execution failed: {e}', + exc_info=True, + extra={ + 'workflow_id': envelope.workflow_id, + 'execution_id': envelope.execution_id, + 'node_id': envelope.node_id, + } + ) + raise + + def validate_envelope(self, envelope: Any) -> bool: + """验证MessageEnvelope的有效性 + + Args: + envelope: MessageEnvelope - 要验证的消息信封 + + Returns: + bool - 验证是否通过 + """ + required_fields = [ + 'message_id', + 'workflow_id', + 'node_id', + 'execution_id', + 'payload', + 'launcher_type', + ] + + for field in required_fields: + if not hasattr(envelope, field): + self._logger.warning( + f'MessageEnvelope missing required field: {field}' + ) + return False + + return True + + def get_responses(self) -> list[dict[str, Any]]: + """获取所有响应 + + Returns: + list - 响应列表 + """ + return self.responses.copy() + + def add_response(self, response: dict[str, Any]) -> None: + """添加响应 + + Args: + response: dict - 响应数据 + """ + self.responses.append(response) + + def get_last_text_response(self) -> str: + """获取最后一个文本响应 + + Returns: + str - 最后一个响应的文本内容 + """ + if not self.responses: + return '' + + last_response = self.responses[-1] + return str(last_response.get('content', '') or '') + + def clear_responses(self) -> None: + """清空所有响应""" + self.responses.clear() + + +class WorkflowPipelineCompatibilityLayer: + """Workflow-Pipeline兼容性层 + + 提供向后兼容性,支持旧的Pipeline Query格式和新的MessageEnvelope格式。 + """ + + def __init__(self): + """初始化兼容性层""" + self._logger = logging.getLogger(__name__) + + def is_workflow_context(self, query: Any) -> bool: + """检查Query是否包含Workflow上下文 + + Args: + query: Query - Pipeline Query对象 + + Returns: + bool - 是否来自Workflow + """ + if hasattr(query, 'is_from_workflow'): + return query.is_from_workflow() + + if hasattr(query, 'get_workflow_context'): + context = query.get_workflow_context() + return bool(context and context.get('workflow_id')) + + return False + + def get_workflow_id(self, query: Any) -> Optional[str]: + """从Query获取Workflow ID + + Args: + query: Query - Pipeline Query对象 + + Returns: + str - Workflow ID,如果不存在则返回None + """ + if hasattr(query, 'get_workflow_id'): + return query.get_workflow_id() + + if hasattr(query, 'get_workflow_context'): + context = query.get_workflow_context() + return context.get('workflow_id') if context else None + + return None + + def get_execution_id(self, query: Any) -> Optional[str]: + """从Query获取执行ID + + Args: + query: Query - Pipeline Query对象 + + Returns: + str - 执行ID,如果不存在则返回None + """ + if hasattr(query, 'get_execution_id'): + return query.get_execution_id() + + if hasattr(query, 'get_workflow_context'): + context = query.get_workflow_context() + return context.get('execution_id') if context else None + + return None diff --git a/src/langbot/pkg/workflow/entities.py b/src/langbot/pkg/workflow/entities.py index 44107a0a..bda59a27 100644 --- a/src/langbot/pkg/workflow/entities.py +++ b/src/langbot/pkg/workflow/entities.py @@ -1,12 +1,27 @@ -"""Workflow entities and data models""" +"""Workflow entities and data models + +This module defines workflow entities using SDK standard entities where available, +and local-specific entities for LangBot_copy-specific functionality. +""" from __future__ import annotations -import enum from datetime import datetime from typing import Any, Optional import pydantic +# Import SDK entities for standard workflow protocol types +from langbot_plugin.api.entities.builtin.workflow import ( + ExecutionContext, + ExecutionStep, + ExecutionStatus, + MessageContext, + NodeDefinition, + NodeState, + NodeStatus, + PortDefinition, +) + class Position(pydantic.BaseModel): """Node position on canvas""" @@ -15,31 +30,6 @@ class Position(pydantic.BaseModel): y: float = 0 -class PortDefinition(pydantic.BaseModel): - """Node port definition""" - - name: str - type: str = 'any' # any, string, number, boolean, object, array - description: str = '' - required: bool = True - - -class NodeDefinition(pydantic.BaseModel): - """Workflow node definition""" - - id: str - type: str - name: str = '' - position: Position = Position() - config: dict[str, Any] = {} - inputs: list[PortDefinition] = [] - outputs: list[PortDefinition] = [] - - # UI metadata - description: str = '' - comment: str = '' # User comment/annotation - - class EdgeDefinition(pydantic.BaseModel): """Workflow edge definition (connection between nodes)""" @@ -161,125 +151,3 @@ class WorkflowDefinition(pydantic.BaseModel): # Source tracking (for imported workflows) source: Optional[str] = None # dify, n8n, langflow, etc. source_id: Optional[str] = None - - -class ExecutionStatus(enum.Enum): - """Workflow execution status""" - - PENDING = 'pending' - RUNNING = 'running' - WAITING = 'waiting' - COMPLETED = 'completed' - FAILED = 'failed' - CANCELLED = 'cancelled' - - -class NodeStatus(enum.Enum): - """Node execution status""" - - PENDING = 'pending' - RUNNING = 'running' - COMPLETED = 'completed' - FAILED = 'failed' - SKIPPED = 'skipped' - - -class NodeState(pydantic.BaseModel): - """Runtime state of a node during execution""" - - node_id: str - status: NodeStatus = NodeStatus.PENDING - inputs: dict[str, Any] = {} - outputs: dict[str, Any] = {} - start_time: Optional[datetime] = None - end_time: Optional[datetime] = None - error: Optional[str] = None - retry_count: int = 0 - - -class MessageContext(pydantic.BaseModel): - """Message context for message-triggered workflows""" - - message_id: str - message_content: str - sender_id: str - sender_name: str = '' - platform: str = '' - conversation_id: str = '' - is_group: bool = False - group_id: Optional[str] = None - mentions: list[str] = [] - reply_to: Optional[str] = None - raw_message: dict[str, Any] = {} - - -class ExecutionStep(pydantic.BaseModel): - """Execution history step""" - - timestamp: datetime - node_id: str - node_type: str - status: str - inputs: dict[str, Any] = {} - outputs: dict[str, Any] = {} - duration_ms: int = 0 - error: Optional[str] = None - - -class ExecutionContext(pydantic.BaseModel): - """Workflow execution context""" - - execution_id: str - workflow_id: str - workflow_version: int = 1 - status: ExecutionStatus = ExecutionStatus.PENDING - - # Runtime data - variables: dict[str, Any] = {} - conversation_variables: dict[str, Any] = {} # Session-level persistent variables - node_states: dict[str, NodeState] = {} - memory: dict[str, Any] = {} # Workflow memory for storing/retrieving data - - # Timing - start_time: Optional[datetime] = None - end_time: Optional[datetime] = None - - # Error - error: Optional[str] = None - - # Message context (if triggered by message) - message_context: Optional[MessageContext] = None - - # Trigger info - trigger_type: Optional[str] = None - trigger_data: dict[str, Any] = {} - - # Execution history - history: list[ExecutionStep] = [] - - # Session info - session_id: Optional[str] = None - user_id: Optional[str] = None - bot_id: Optional[str] = None - - def get_node_output(self, node_id: str, output_name: str = 'output') -> Any: - """Get output from a specific node""" - if node_id in self.node_states: - return self.node_states[node_id].outputs.get(output_name) - return None - - def set_variable(self, name: str, value: Any): - """Set a workflow variable""" - self.variables[name] = value - - def get_variable(self, name: str, default: Any = None) -> Any: - """Get a workflow variable""" - return self.variables.get(name, default) - - def set_conversation_variable(self, name: str, value: Any): - """Set a conversation-level variable (persisted across executions)""" - self.conversation_variables[name] = value - - def get_conversation_variable(self, name: str, default: Any = None) -> Any: - """Get a conversation-level variable""" - return self.conversation_variables.get(name, default) diff --git a/src/langbot/pkg/workflow/executor.py b/src/langbot/pkg/workflow/executor.py index 63c24a35..a5dee55f 100644 --- a/src/langbot/pkg/workflow/executor.py +++ b/src/langbot/pkg/workflow/executor.py @@ -178,7 +178,7 @@ class WorkflowExecutor: # Initialize node states for node in workflow.nodes: if node.id not in context.node_states: - context.node_states[node.id] = NodeState(node_id=node.id) + context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING) # Find start node(s) if start_node_id: @@ -662,14 +662,15 @@ class WorkflowExecutor: duration_ms = int((node_state.end_time - node_state.start_time).total_seconds() * 1000) step = ExecutionStep( + step_id=f"step_{uuid.uuid4().hex[:8]}", timestamp=datetime.now(), node_id=node.id, node_type=node.type, - status=node_state.status.value, - inputs=node_state.inputs, - outputs=node_state.outputs, + status=node_state.status, duration_ms=duration_ms, error=node_state.error, + inputs=node_state.inputs, + outputs=node_state.outputs, ) context.history.append(step) @@ -801,7 +802,7 @@ class LoopExecutor: for node in loop_body: # Reset node state for this iteration - context.node_states[node.id] = NodeState(node_id=node.id) + context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING) await self.executor._execute_node(node, context, max_retries=3) @@ -823,5 +824,3 @@ class LoopExecutor: context.variables.pop('loop_is_last', None) return results - - diff --git a/src/langbot/pkg/workflow/nodes/call_pipeline.py b/src/langbot/pkg/workflow/nodes/call_pipeline.py index 6c1684df..4699bb6a 100644 --- a/src/langbot/pkg/workflow/nodes/call_pipeline.py +++ b/src/langbot/pkg/workflow/nodes/call_pipeline.py @@ -5,18 +5,61 @@ Node metadata is loaded from: ../../templates/metadata/nodes/call_pipeline.yaml from __future__ import annotations -from typing import Any +from typing import Any, Optional + +import pydantic import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_event_logger import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.provider.session as provider_session -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node + +class _NoOpEventLogger(abstract_event_logger.AbstractEventLogger): + """No-op event logger for workflow pipeline adapter.""" + + async def info( + self, + text: str, + images: Optional[list[platform_message.Image]] = None, + message_session_id: Optional[str] = None, + no_throw: bool = True, + ): + pass + + async def debug( + self, + text: str, + images: Optional[list[platform_message.Image]] = None, + message_session_id: Optional[str] = None, + no_throw: bool = True, + ): + pass + + async def warning( + self, + text: str, + images: Optional[list[platform_message.Image]] = None, + message_session_id: Optional[str] = None, + no_throw: bool = True, + ): + pass + + async def error( + self, + text: str, + images: Optional[list[platform_message.Image]] = None, + message_session_id: Optional[str] = None, + no_throw: bool = True, + ): + pass + @workflow_node('call_pipeline') class CallPipelineNode(WorkflowNode): """Call pipeline node - invoke an existing pipeline""" @@ -139,10 +182,16 @@ class CallPipelineNode(WorkflowNode): ) class _WorkflowPipelineCaptureAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): + """Adapter to capture pipeline responses for workflow execution.""" + + class Config: + arbitrary_types_allowed = True + responses: list[dict[str, Any]] = [] + context: ExecutionContext = pydantic.Field(exclude=True) def __init__(self, context: ExecutionContext): - super().__init__(config={}, logger=None) + super().__init__(config={}, logger=_NoOpEventLogger()) self.context = context self.responses = [] diff --git a/src/langbot/pkg/workflow/nodes/code_executor.py b/src/langbot/pkg/workflow/nodes/code_executor.py index af27998d..f51e2ab7 100644 --- a/src/langbot/pkg/workflow/nodes/code_executor.py +++ b/src/langbot/pkg/workflow/nodes/code_executor.py @@ -9,7 +9,7 @@ import json import re from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('code_executor') diff --git a/src/langbot/pkg/workflow/nodes/condition.py b/src/langbot/pkg/workflow/nodes/condition.py index e50478a8..325dc37c 100644 --- a/src/langbot/pkg/workflow/nodes/condition.py +++ b/src/langbot/pkg/workflow/nodes/condition.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node from ..safe_eval import safe_eval_with_vars diff --git a/src/langbot/pkg/workflow/nodes/coze_bot.py b/src/langbot/pkg/workflow/nodes/coze_bot.py index e086f157..e2389088 100644 --- a/src/langbot/pkg/workflow/nodes/coze_bot.py +++ b/src/langbot/pkg/workflow/nodes/coze_bot.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('coze_bot') diff --git a/src/langbot/pkg/workflow/nodes/cron_trigger.py b/src/langbot/pkg/workflow/nodes/cron_trigger.py index 6a480d3c..13c76639 100644 --- a/src/langbot/pkg/workflow/nodes/cron_trigger.py +++ b/src/langbot/pkg/workflow/nodes/cron_trigger.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('cron_trigger') diff --git a/src/langbot/pkg/workflow/nodes/data_transform.py b/src/langbot/pkg/workflow/nodes/data_transform.py index c5eec5d2..7445662c 100644 --- a/src/langbot/pkg/workflow/nodes/data_transform.py +++ b/src/langbot/pkg/workflow/nodes/data_transform.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node from ..safe_eval import safe_eval_with_vars diff --git a/src/langbot/pkg/workflow/nodes/database_query.py b/src/langbot/pkg/workflow/nodes/database_query.py index 22c7e9d6..65f96bb1 100644 --- a/src/langbot/pkg/workflow/nodes/database_query.py +++ b/src/langbot/pkg/workflow/nodes/database_query.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('database_query') diff --git a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py index 07f35c04..2d8ff8cf 100644 --- a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py +++ b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('dify_knowledge_query') diff --git a/src/langbot/pkg/workflow/nodes/dify_workflow.py b/src/langbot/pkg/workflow/nodes/dify_workflow.py index 13477905..3e67442c 100644 --- a/src/langbot/pkg/workflow/nodes/dify_workflow.py +++ b/src/langbot/pkg/workflow/nodes/dify_workflow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('dify_workflow') diff --git a/src/langbot/pkg/workflow/nodes/end.py b/src/langbot/pkg/workflow/nodes/end.py index 2fdc8713..879023a3 100644 --- a/src/langbot/pkg/workflow/nodes/end.py +++ b/src/langbot/pkg/workflow/nodes/end.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('end') diff --git a/src/langbot/pkg/workflow/nodes/event_trigger.py b/src/langbot/pkg/workflow/nodes/event_trigger.py index 2bb5004b..2e6da56a 100644 --- a/src/langbot/pkg/workflow/nodes/event_trigger.py +++ b/src/langbot/pkg/workflow/nodes/event_trigger.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('event_trigger') diff --git a/src/langbot/pkg/workflow/nodes/http_request.py b/src/langbot/pkg/workflow/nodes/http_request.py index 004dfb36..3bb4f5f3 100644 --- a/src/langbot/pkg/workflow/nodes/http_request.py +++ b/src/langbot/pkg/workflow/nodes/http_request.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('http_request') diff --git a/src/langbot/pkg/workflow/nodes/iterator.py b/src/langbot/pkg/workflow/nodes/iterator.py index 5bd7526c..fa33f04e 100644 --- a/src/langbot/pkg/workflow/nodes/iterator.py +++ b/src/langbot/pkg/workflow/nodes/iterator.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('iterator') diff --git a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py index 1cd14f85..46eafeb1 100644 --- a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py +++ b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('knowledge_retrieval') diff --git a/src/langbot/pkg/workflow/nodes/langflow_flow.py b/src/langbot/pkg/workflow/nodes/langflow_flow.py index 9edc8290..e31c4b46 100644 --- a/src/langbot/pkg/workflow/nodes/langflow_flow.py +++ b/src/langbot/pkg/workflow/nodes/langflow_flow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('langflow_flow') diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py index 4ff5d81c..c04400b0 100644 --- a/src/langbot/pkg/workflow/nodes/llm_call.py +++ b/src/langbot/pkg/workflow/nodes/llm_call.py @@ -10,7 +10,7 @@ from typing import Any, AsyncGenerator import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.resource.tool as resource_tool -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) @@ -181,7 +181,6 @@ class LLMCallNode(WorkflowNode): # Get error handling config exception_handling = self.get_config('exception_handling', 'show-error') failure_hint = self.get_config('failure_hint', 'Request failed.') - remove_think = self.get_config('remove_think', False) track_function_calls = self.get_config('track_function_calls', False) # Get output format and json_schema config @@ -192,11 +191,10 @@ class LLMCallNode(WorkflowNode): enable_tools = self.get_config('enable_tools', False) tools_config = self.get_config('tools', []) - # Resolve prompts - handle null user_prompt_template + # Resolve prompts system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context) user_prompt_template = self.get_config('user_prompt_template') if user_prompt_template is None: - # Default to input if not set user_prompt_template = '{{input}}' user_prompt = self._resolve_template(user_prompt_template, inputs, context) @@ -229,14 +227,11 @@ class LLMCallNode(WorkflowNode): if tool_names: all_tools = await self.ap.tool_mgr.get_tools() funcs = [t for t in all_tools if t.name in tool_names] - if funcs: - logger.info(f'LLM call node {self.node_id}: using tools: {[t.name for t in funcs]}') except Exception as e: - logger.warning(f'LLM call node {self.node_id}: failed to load tools - {e}') + logger.warning(f'[LLM:{self.node_id}] Failed to load tools: {e}') funcs = None - # Invoke LLM with error handling - logger.info(f'LLM call node {self.node_id}: invoking model {model_uuid}') + # Invoke LLM try: result_message = await runtime_model.provider.invoke_llm( query=None, @@ -246,7 +241,7 @@ class LLMCallNode(WorkflowNode): extra_args=extra_args, ) except Exception as e: - logger.warning(f'LLM call node {self.node_id}: request failed - {e}') + logger.warning(f'[LLM:{self.node_id}] LLM call failed: {e}') # Handle based on exception handling strategy if exception_handling == 'show-error': @@ -284,21 +279,20 @@ class LLMCallNode(WorkflowNode): elif isinstance(elem, str): response_text += elem - # Remove CoT (Chain of Thought) content if configured - if remove_think: - response_text = self._remove_think_content(response_text) + # Remove CoT content (always remove to avoid leaking internal reasoning) + response_text = self._remove_think_content(response_text) # Apply content safety filter response_text, is_blocked, filter_notice = self._apply_content_filter(response_text) if is_blocked: - logger.warning(f'LLM call node {self.node_id}: response blocked by content filter - {filter_notice}') + logger.warning(f'[LLM:{self.node_id}] Response blocked by content filter: {filter_notice}') return { 'response': filter_notice, 'usage': usage, 'blocked_by_filter': True, } - # Extract usage info if available + # Extract usage info usage = { 'prompt_tokens': 0, 'completion_tokens': 0, @@ -319,6 +313,7 @@ class LLMCallNode(WorkflowNode): 'total_tokens': getattr(u, 'total_tokens', 0) or 0, } + # Build result result: dict[str, Any] = { 'response': response_text, 'usage': usage, @@ -329,7 +324,7 @@ class LLMCallNode(WorkflowNode): try: result['parsed'] = json.loads(response_text) except json.JSONDecodeError as e: - logger.warning(f'LLM call node {self.node_id}: failed to parse JSON response - {e}') + logger.warning(f'[LLM:{self.node_id}] Failed to parse JSON: {e}') result['parsed'] = None result['parse_error'] = str(e) @@ -354,7 +349,6 @@ class LLMCallNode(WorkflowNode): if not self.ap: raise RuntimeError('Application instance not available - cannot call LLM') - remove_think = self.get_config('remove_think', False) exception_handling = self.get_config('exception_handling', 'show-error') failure_hint = self.get_config('failure_hint', 'Request failed.') @@ -383,7 +377,7 @@ class LLMCallNode(WorkflowNode): if max_tokens and int(max_tokens) > 0: extra_args['max_tokens'] = int(max_tokens) - logger.info(f'LLM call node {self.node_id}: streaming model {model_uuid}') + logger.info(f'[LLM:{self.node_id}] Streaming model {model_uuid}') try: # Try streaming first @@ -396,6 +390,7 @@ class LLMCallNode(WorkflowNode): ) full_response = '' + in_think_block = False async for chunk in stream: chunk_text = '' if hasattr(chunk, 'content'): @@ -409,16 +404,25 @@ class LLMCallNode(WorkflowNode): chunk_text += elem if chunk_text: - if remove_think: - chunk_text = self._remove_think_content(chunk_text) - full_response += chunk_text - yield chunk_text + # Filter blocks in streaming mode + if '' in chunk_text or '' in chunk_text: + in_think_block = True + if in_think_block: + if '' in chunk_text or '' in chunk_text: + in_think_block = False + chunk_text = chunk_text.split('')[-1].split('')[-1] + else: + chunk_text = '' + + if chunk_text: + full_response += chunk_text + yield chunk_text # Store in context for downstream nodes context.variables['_last_llm_response'] = full_response except Exception as e: - logger.warning(f'LLM call node {self.node_id}: streaming failed, falling back - {e}') + logger.warning(f'[LLM:{self.node_id}] Streaming failed, falling back - {e}') # Fallback to non-streaming try: result_message = await runtime_model.provider.invoke_llm( @@ -429,12 +433,12 @@ class LLMCallNode(WorkflowNode): extra_args=extra_args, ) response_text = self._extract_response_text(result_message) - if remove_think: - response_text = self._remove_think_content(response_text) + # Always remove content in fallback + response_text = self._remove_think_content(response_text) yield response_text context.variables['_last_llm_response'] = response_text except Exception as e2: - logger.error(f'LLM call node {self.node_id}: fallback also failed - {e2}') + logger.error(f'[LLM:{self.node_id}] Fallback also failed - {e2}') if exception_handling == 'show-hint': yield failure_hint elif exception_handling != 'hide': diff --git a/src/langbot/pkg/workflow/nodes/loop.py b/src/langbot/pkg/workflow/nodes/loop.py index f33ed72a..18cfcb2f 100644 --- a/src/langbot/pkg/workflow/nodes/loop.py +++ b/src/langbot/pkg/workflow/nodes/loop.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('loop') diff --git a/src/langbot/pkg/workflow/nodes/mcp_tool.py b/src/langbot/pkg/workflow/nodes/mcp_tool.py index 60e22734..ee4e3547 100644 --- a/src/langbot/pkg/workflow/nodes/mcp_tool.py +++ b/src/langbot/pkg/workflow/nodes/mcp_tool.py @@ -11,7 +11,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('mcp_tool') diff --git a/src/langbot/pkg/workflow/nodes/memory_store.py b/src/langbot/pkg/workflow/nodes/memory_store.py index 35edc658..8cc2b110 100644 --- a/src/langbot/pkg/workflow/nodes/memory_store.py +++ b/src/langbot/pkg/workflow/nodes/memory_store.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node class MemoryHelper: diff --git a/src/langbot/pkg/workflow/nodes/merge.py b/src/langbot/pkg/workflow/nodes/merge.py index 83b677a3..483f3701 100644 --- a/src/langbot/pkg/workflow/nodes/merge.py +++ b/src/langbot/pkg/workflow/nodes/merge.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('merge') diff --git a/src/langbot/pkg/workflow/nodes/message_trigger.py b/src/langbot/pkg/workflow/nodes/message_trigger.py index ee07cc2c..fe29b007 100644 --- a/src/langbot/pkg/workflow/nodes/message_trigger.py +++ b/src/langbot/pkg/workflow/nodes/message_trigger.py @@ -9,7 +9,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('message_trigger') diff --git a/src/langbot/pkg/workflow/nodes/n8n_workflow.py b/src/langbot/pkg/workflow/nodes/n8n_workflow.py index e7c5878b..2137b192 100644 --- a/src/langbot/pkg/workflow/nodes/n8n_workflow.py +++ b/src/langbot/pkg/workflow/nodes/n8n_workflow.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('n8n_workflow') diff --git a/src/langbot/pkg/workflow/nodes/opening_statement.py b/src/langbot/pkg/workflow/nodes/opening_statement.py index 2cb8b30b..20e50008 100644 --- a/src/langbot/pkg/workflow/nodes/opening_statement.py +++ b/src/langbot/pkg/workflow/nodes/opening_statement.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('opening_statement') diff --git a/src/langbot/pkg/workflow/nodes/parallel.py b/src/langbot/pkg/workflow/nodes/parallel.py index 3eb970ff..0cb2eeec 100644 --- a/src/langbot/pkg/workflow/nodes/parallel.py +++ b/src/langbot/pkg/workflow/nodes/parallel.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('parallel') diff --git a/src/langbot/pkg/workflow/nodes/parameter_extractor.py b/src/langbot/pkg/workflow/nodes/parameter_extractor.py index 4b383a58..75c3e6c4 100644 --- a/src/langbot/pkg/workflow/nodes/parameter_extractor.py +++ b/src/langbot/pkg/workflow/nodes/parameter_extractor.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('parameter_extractor') diff --git a/src/langbot/pkg/workflow/nodes/plugin_call.py b/src/langbot/pkg/workflow/nodes/plugin_call.py index a420c60a..428f333a 100644 --- a/src/langbot/pkg/workflow/nodes/plugin_call.py +++ b/src/langbot/pkg/workflow/nodes/plugin_call.py @@ -7,7 +7,7 @@ # from typing import Any -# from ..entities import ExecutionContext +# from langbot_plugin.api.entities.builtin.workflow import ExecutionContext # from ..node import WorkflowNode, workflow_node # @workflow_node('plugin_call') diff --git a/src/langbot/pkg/workflow/nodes/question_classifier.py b/src/langbot/pkg/workflow/nodes/question_classifier.py index fc3fe6f8..31a09df1 100644 --- a/src/langbot/pkg/workflow/nodes/question_classifier.py +++ b/src/langbot/pkg/workflow/nodes/question_classifier.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('question_classifier') diff --git a/src/langbot/pkg/workflow/nodes/redis_operation.py b/src/langbot/pkg/workflow/nodes/redis_operation.py index 8c74510f..6e4b4eab 100644 --- a/src/langbot/pkg/workflow/nodes/redis_operation.py +++ b/src/langbot/pkg/workflow/nodes/redis_operation.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('redis_operation') diff --git a/src/langbot/pkg/workflow/nodes/reply_message.py b/src/langbot/pkg/workflow/nodes/reply_message.py index 223fbc20..1633cfe6 100644 --- a/src/langbot/pkg/workflow/nodes/reply_message.py +++ b/src/langbot/pkg/workflow/nodes/reply_message.py @@ -8,7 +8,7 @@ from __future__ import annotations import logging from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node logger = logging.getLogger(__name__) @@ -20,13 +20,14 @@ class ReplyMessageNode(WorkflowNode): category = 'action' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - message = inputs.get('message') - if message in (None, ''): - message = inputs.get('input') + # Priority: content/response (from upstream nodes) > message (original input) > context + message = inputs.get('content') if message in (None, ''): message = inputs.get('response') if message in (None, ''): - message = inputs.get('content') + message = inputs.get('input') + if message in (None, ''): + message = inputs.get('message') if message in (None, '') and context.message_context: message = context.message_context.message_content if message is None: diff --git a/src/langbot/pkg/workflow/nodes/send_message.py b/src/langbot/pkg/workflow/nodes/send_message.py index 3c2237a1..4989615f 100644 --- a/src/langbot/pkg/workflow/nodes/send_message.py +++ b/src/langbot/pkg/workflow/nodes/send_message.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('send_message') diff --git a/src/langbot/pkg/workflow/nodes/set_variable.py b/src/langbot/pkg/workflow/nodes/set_variable.py index 50031656..bee36e36 100644 --- a/src/langbot/pkg/workflow/nodes/set_variable.py +++ b/src/langbot/pkg/workflow/nodes/set_variable.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('set_variable') diff --git a/src/langbot/pkg/workflow/nodes/store_data.py b/src/langbot/pkg/workflow/nodes/store_data.py index 01d1ff90..7eb20e0b 100644 --- a/src/langbot/pkg/workflow/nodes/store_data.py +++ b/src/langbot/pkg/workflow/nodes/store_data.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('store_data') diff --git a/src/langbot/pkg/workflow/nodes/switch.py b/src/langbot/pkg/workflow/nodes/switch.py index 22f0c674..8d196647 100644 --- a/src/langbot/pkg/workflow/nodes/switch.py +++ b/src/langbot/pkg/workflow/nodes/switch.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('switch') diff --git a/src/langbot/pkg/workflow/nodes/variable_aggregator.py b/src/langbot/pkg/workflow/nodes/variable_aggregator.py index 5b5fe821..e9cfbce8 100644 --- a/src/langbot/pkg/workflow/nodes/variable_aggregator.py +++ b/src/langbot/pkg/workflow/nodes/variable_aggregator.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('variable_aggregator') diff --git a/src/langbot/pkg/workflow/nodes/wait.py b/src/langbot/pkg/workflow/nodes/wait.py index 08844087..465add98 100644 --- a/src/langbot/pkg/workflow/nodes/wait.py +++ b/src/langbot/pkg/workflow/nodes/wait.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('wait') diff --git a/src/langbot/pkg/workflow/nodes/webhook_trigger.py b/src/langbot/pkg/workflow/nodes/webhook_trigger.py index af95c598..64c04ef3 100644 --- a/src/langbot/pkg/workflow/nodes/webhook_trigger.py +++ b/src/langbot/pkg/workflow/nodes/webhook_trigger.py @@ -7,7 +7,7 @@ from __future__ import annotations from typing import Any -from ..entities import ExecutionContext +from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node @workflow_node('webhook_trigger') diff --git a/src/langbot/pkg/workflow/variable_manager.py b/src/langbot/pkg/workflow/variable_manager.py new file mode 100644 index 00000000..b005ccf3 --- /dev/null +++ b/src/langbot/pkg/workflow/variable_manager.py @@ -0,0 +1,185 @@ +"""Variable management for workflow execution. + +This module provides utilities for managing workflow variables, including: +- Reserved variable protection +- Variable namespace validation +- Variable inheritance between nodes +""" + +from typing import Set, Dict, Any + +# 保留变量列表 - 这些变量由系统管理,不能被用户覆盖 +RESERVED_VARIABLES = { + 'workflow_id', + 'execution_id', + 'node_id', + 'timestamp', + 'launcher_type', + 'trigger_type', + 'message_id', + 'execution_status', +} + + +def get_reserved_variables() -> Set[str]: + """获取保留变量列表 + + Returns: + 保留变量名称的集合 + """ + return RESERVED_VARIABLES.copy() + + +def validate_variable_namespace( + variables: Dict[str, Any], + namespace: str = 'workflow' +) -> bool: + """验证变量命名空间 + + 检查变量是否使用了保留的名称。允许不带前缀的变量,但建议使用命名空间前缀。 + + Args: + variables: 要验证的变量字典 + namespace: 命名空间前缀(可选) + + Returns: + True 如果验证通过 + + Raises: + ValueError: 如果变量名称与保留变量冲突 + """ + reserved = get_reserved_variables() + + for var_name in variables.keys(): + if var_name in reserved: + raise ValueError( + f"Variable '{var_name}' is reserved and cannot be used. " + f"Reserved variables: {', '.join(sorted(reserved))}" + ) + + # 检查命名空间前缀(可选建议) + if namespace and not var_name.startswith(f"{namespace}_"): + # 允许不带前缀的变量,但建议使用前缀 + pass + + return True + + +def inherit_variables( + parent_variables: Dict[str, Any], + child_namespace: str +) -> Dict[str, Any]: + """继承父节点变量到子节点 + + 将父节点的变量继承到子节点,跳过保留变量,并添加命名空间前缀。 + + Args: + parent_variables: 父节点的变量字典 + child_namespace: 子节点的命名空间前缀 + + Returns: + 带有命名空间前缀的继承变量字典 + """ + inherited = {} + + for key, value in parent_variables.items(): + # 跳过保留变量 + if key in RESERVED_VARIABLES: + continue + + # 添加命名空间前缀 + namespaced_key = f"{child_namespace}_{key}" + inherited[namespaced_key] = value + + return inherited + + +def merge_variables( + base_variables: Dict[str, Any], + override_variables: Dict[str, Any], + allow_reserved: bool = False +) -> Dict[str, Any]: + """合并变量字典 + + 将override_variables合并到base_variables中。 + + Args: + base_variables: 基础变量字典 + override_variables: 要合并的变量字典 + allow_reserved: 是否允许覆盖保留变量(默认False) + + Returns: + 合并后的变量字典 + + Raises: + ValueError: 如果尝试覆盖保留变量且allow_reserved=False + """ + result = base_variables.copy() + + for key, value in override_variables.items(): + if key in RESERVED_VARIABLES and not allow_reserved: + raise ValueError( + f"Cannot override reserved variable '{key}'. " + f"Set allow_reserved=True to override." + ) + result[key] = value + + return result + + +def extract_namespace_variables( + variables: Dict[str, Any], + namespace: str +) -> Dict[str, Any]: + """提取特定命名空间的变量 + + 从变量字典中提取具有特定命名空间前缀的变量。 + + Args: + variables: 变量字典 + namespace: 命名空间前缀 + + Returns: + 提取的变量字典(不包含命名空间前缀) + """ + prefix = f"{namespace}_" + result = {} + + for key, value in variables.items(): + if key.startswith(prefix): + # 移除命名空间前缀 + clean_key = key[len(prefix):] + result[clean_key] = value + + return result + + +def sanitize_variables( + variables: Dict[str, Any], + allowed_keys: Set[str] | None = None +) -> Dict[str, Any]: + """清理变量字典 + + 移除保留变量和不在允许列表中的变量。 + + Args: + variables: 要清理的变量字典 + allowed_keys: 允许的键集合(如果为None,则允许所有非保留键) + + Returns: + 清理后的变量字典 + """ + result = {} + + for key, value in variables.items(): + # 跳过保留变量 + if key in RESERVED_VARIABLES: + continue + + # 检查允许列表 + if allowed_keys is not None and key not in allowed_keys: + continue + + result[key] = value + + return result diff --git a/uv.lock b/uv.lock index fc56bbbc..72145a33 100644 --- a/uv.lock +++ b/uv.lock @@ -1973,7 +1973,7 @@ requires-dist = [ { name = "ebooklib", specifier = ">=0.18" }, { name = "gewechat-client", specifier = ">=0.1.5" }, { name = "html2text", specifier = ">=2024.2.26" }, - { name = "langbot-plugin", specifier = "==0.3.11" }, + { name = "langbot-plugin", directory = "/home/typer/Desktop/langbot-plugin-sdk" }, { name = "langchain", specifier = ">=0.2.0" }, { name = "langchain-core", specifier = ">=1.2.28" }, { name = "langchain-text-splitters", specifier = ">=1.1.2" }, @@ -2037,7 +2037,7 @@ dev = [ [[package]] name = "langbot-plugin" version = "0.3.11" -source = { registry = "https://pypi.org/simple" } +source = { directory = "/home/typer/Desktop/langbot-plugin-sdk" } dependencies = [ { name = "aiofiles" }, { name = "dotenv" }, @@ -2054,9 +2054,31 @@ dependencies = [ { name = "watchdog" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/91/83/93b86bcdbfe51d820fa59232aaa73cc802d6ce614f67d8f8b33957419538/langbot_plugin-0.3.11.tar.gz", hash = "sha256:8d10c98c771b468b2d35cc007778439c39922a88265fcc16a5881234bc7c1b19", size = 190315, upload-time = "2026-05-12T15:45:24.262Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/8f/22/de7977a6a5cbf557b80043eb3ed39e5feff24033a5d6db4ab88d48ccb6ea/langbot_plugin-0.3.11-py3-none-any.whl", hash = "sha256:c1d2e84eda1584902d99efa316b850c08c1c04fcc199306ff4af1dca1431304a", size = 165574, upload-time = "2026-05-12T15:45:22.908Z" }, + +[package.metadata] +requires-dist = [ + { name = "aiofiles", specifier = ">=24.1.0" }, + { name = "dotenv", specifier = ">=0.9.9" }, + { name = "httpx", specifier = ">=0.28.1" }, + { name = "jinja2", specifier = ">=3.1.6" }, + { name = "pip", specifier = ">=25.2" }, + { name = "pydantic", specifier = ">=2.11.5" }, + { name = "pydantic-settings", specifier = ">=2.10.1" }, + { name = "pytest", specifier = ">=8.4.0" }, + { name = "pyyaml", specifier = ">=6.0.2" }, + { name = "textual", specifier = ">=3.2.0" }, + { name = "types-aiofiles", specifier = ">=24.1.0.20250516" }, + { name = "types-pyyaml", specifier = ">=6.0.12.20250516" }, + { name = "watchdog", specifier = ">=6.0.0" }, + { name = "websockets", specifier = ">=15.0.1" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "mypy", specifier = ">=1.16.0" }, + { name = "pytest-asyncio", specifier = ">=1.3.0" }, + { name = "pytest-cov", specifier = ">=7.0.0" }, + { name = "ruff", specifier = ">=0.11.12" }, ] [[package]] @@ -2751,7 +2773,7 @@ wheels = [ [[package]] name = "moto" version = "5.2.1" -source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "boto3" }, { name = "botocore" }, @@ -2761,9 +2783,9 @@ dependencies = [ { name = "werkzeug" }, { name = "xmltodict" }, ] -sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" } wheels = [ - { url = "https://pypi.tuna.tsinghua.edu.cn/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" }, + { url = "https://files.pythonhosted.org/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" }, ] [[package]] @@ -4767,15 +4789,15 @@ wheels = [ [[package]] name = "responses" version = "0.26.0" -source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pyyaml" }, { name = "requests" }, { name = "urllib3" }, ] -sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" } +sdist = { url = "https://files.pythonhosted.org/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" } wheels = [ - { url = "https://pypi.tuna.tsinghua.edu.cn/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" }, + { url = "https://files.pythonhosted.org/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" }, ] [[package]] @@ -6072,10 +6094,10 @@ wheels = [ [[package]] name = "xmltodict" version = "1.0.4" -source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } -sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" } +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" } wheels = [ - { url = "https://pypi.tuna.tsinghua.edu.cn/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" }, + { url = "https://files.pythonhosted.org/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" }, ] [[package]]