diff --git a/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py b/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py
index d989d45e..ccce2a87 100644
--- a/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py
+++ b/src/langbot/pkg/api/http/controller/groups/workflows/workflows.py
@@ -312,6 +312,164 @@ class WorkflowsRouterGroup(group.RouterGroup):
except ValueError as e:
return self.http_status(404, -1, str(e))
+ # LLM Node Performance Test Endpoint
+ # Tests each step of LLM node execution with detailed timing
+ @self.route('/_/test/llm-node', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
+ async def _() -> str:
+ """Test LLM node performance with detailed step-by-step timing.
+
+ Request body:
+ {
+ "model_uuid": "uuid-of-model",
+ "system_prompt": "optional system prompt",
+ "user_prompt": "test message",
+ "enable_tools": false,
+ "tools": [],
+ "temperature": 0.7,
+ "max_tokens": 100
+ }
+
+ Response includes timing for each step:
+ - model_fetch: Time to get model from model_mgr
+ - tool_fetch: Time to load tools (if enabled)
+ - prompt_build: Time to build messages
+ - llm_call: Time for actual LLM invocation
+ - total: Total time
+ """
+ import time
+
+ json_data = await quart.request.json
+ if not json_data:
+ return self.http_status(400, -1, 'Request body is required')
+
+ model_uuid = json_data.get('model_uuid', '')
+ if not model_uuid:
+ return self.http_status(400, -1, 'model_uuid is required')
+
+ user_prompt = json_data.get('user_prompt', 'test')
+ system_prompt = json_data.get('system_prompt', '')
+ enable_tools = json_data.get('enable_tools', False)
+ tools_config = json_data.get('tools', [])
+ temperature = json_data.get('temperature')
+ max_tokens = json_data.get('max_tokens', 0)
+
+ timings = {}
+ errors = []
+
+ # Step 1: Model fetch
+ t_start = time.perf_counter()
+ try:
+ runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
+ timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ timings['model_found'] = True
+ timings['model_name'] = runtime_model.model_entity.name if runtime_model else None
+ except Exception as e:
+ timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ timings['model_found'] = False
+ errors.append(f'Model fetch failed: {str(e)}')
+ return self.http_status(400, -1, {
+ 'error': errors[0],
+ 'timings': timings,
+ })
+
+ # Step 2: Tool fetch (if enabled)
+ timings['tool_fetch_ms'] = 0
+ timings['tools_loaded'] = 0
+ if enable_tools and tools_config:
+ t_start = time.perf_counter()
+ try:
+ import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
+ all_tools = await self.ap.tool_mgr.get_tools()
+ tool_names = tools_config if isinstance(tools_config, list) else []
+ funcs = [t for t in all_tools if t.name in tool_names]
+ timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ timings['tools_loaded'] = len(funcs)
+ timings['tool_names'] = [t.name for t in funcs]
+ except Exception as e:
+ timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ errors.append(f'Tool fetch failed: {str(e)}')
+
+ # Step 3: Build messages
+ t_start = time.perf_counter()
+ import langbot_plugin.api.entities.builtin.provider.message as provider_message
+ messages = []
+ if system_prompt:
+ messages.append(provider_message.Message(role='system', content=system_prompt))
+ messages.append(provider_message.Message(role='user', content=user_prompt))
+ timings['prompt_build_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+
+ # Step 4: Build extra args
+ extra_args = {}
+ if temperature is not None:
+ extra_args['temperature'] = float(temperature)
+ if max_tokens and int(max_tokens) > 0:
+ extra_args['max_tokens'] = int(max_tokens)
+
+ # Step 5: LLM call
+ t_start = time.perf_counter()
+ try:
+ result_message = await runtime_model.provider.invoke_llm(
+ query=None,
+ model=runtime_model,
+ messages=messages,
+ funcs=funcs if enable_tools else None,
+ extra_args=extra_args,
+ )
+ timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ timings['llm_call_success'] = True
+
+ # Extract response text
+ response_text = ''
+ if isinstance(result_message.content, str):
+ response_text = result_message.content
+ elif isinstance(result_message.content, list):
+ for elem in result_message.content:
+ if hasattr(elem, 'text') and elem.text:
+ response_text += elem.text
+ elif isinstance(elem, str):
+ response_text += elem
+
+ timings['response_length'] = len(response_text)
+ timings['response_preview'] = response_text[:200]
+
+ # Extract usage
+ usage = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
+ if hasattr(result_message, 'usage') and result_message.usage:
+ u = result_message.usage
+ usage = {
+ 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0,
+ 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0,
+ 'total_tokens': getattr(u, 'total_tokens', 0) or 0,
+ }
+ timings['usage'] = usage
+
+ except Exception as e:
+ timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
+ timings['llm_call_success'] = False
+ errors.append(f'LLM call failed: {str(e)}')
+
+ # Calculate total
+ timings['total_ms'] = round(sum([
+ timings.get('model_fetch_ms', 0),
+ timings.get('tool_fetch_ms', 0),
+ timings.get('prompt_build_ms', 0),
+ timings.get('llm_call_ms', 0),
+ ]), 2)
+
+ # Add breakdown percentage
+ if timings['total_ms'] > 0:
+ timings['breakdown'] = {
+ 'model_fetch_pct': round(timings.get('model_fetch_ms', 0) / timings['total_ms'] * 100, 1),
+ 'tool_fetch_pct': round(timings.get('tool_fetch_ms', 0) / timings['total_ms'] * 100, 1),
+ 'prompt_build_pct': round(timings.get('prompt_build_ms', 0) / timings['total_ms'] * 100, 1),
+ 'llm_call_pct': round(timings.get('llm_call_ms', 0) / timings['total_ms'] * 100, 1),
+ }
+
+ if errors:
+ timings['errors'] = errors
+
+ return self.success(data={'test_result': timings})
+
@group.group_class('executions', '/api/v1/executions')
class ExecutionsRouterGroup(group.RouterGroup):
diff --git a/src/langbot/pkg/workflow/adapters.py b/src/langbot/pkg/workflow/adapters.py
new file mode 100644
index 00000000..17530a4f
--- /dev/null
+++ b/src/langbot/pkg/workflow/adapters.py
@@ -0,0 +1,204 @@
+"""Workflow-Pipeline通信适配器
+
+这个模块提供了Workflow和Pipeline之间的通信适配,使用SDK标准的MessageEnvelope格式。
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class _WorkflowPipelineCaptureAdapter:
+ """Workflow-Pipeline通信适配器
+
+ 用于在Workflow节点和Pipeline之间进行标准化的消息传递。
+ 支持MessageEnvelope格式的双向转换。
+ """
+
+ def __init__(self, context: Any):
+ """初始化适配器
+
+ Args:
+ context: ExecutionContext - Workflow执行上下文
+ """
+ self.context = context
+ self.responses: list[dict[str, Any]] = []
+ self.bot_account_id: Optional[str] = None
+ self._logger = logging.getLogger(__name__)
+
+ async def call_pipeline_with_envelope(
+ self,
+ envelope: Any,
+ pipeline_executor: Any
+ ) -> Any:
+ """使用MessageEnvelope调用Pipeline
+
+ Args:
+ envelope: MessageEnvelope - 标准消息信封
+ pipeline_executor: Pipeline执行器实例
+
+ Returns:
+ MessageEnvelope - 执行结果信封
+ """
+ try:
+ # 动态导入以避免循环依赖
+ from langbot_plugin_sdk.workflow import envelope_to_query, query_to_envelope
+
+ # 1. 转换为Query
+ query = envelope_to_query(envelope)
+
+ # 2. 调用Pipeline
+ result_query = await pipeline_executor.execute(query)
+
+ # 3. 转换回Envelope
+ result_envelope = query_to_envelope(result_query, envelope)
+
+ self._logger.debug(
+ f'Pipeline execution completed for workflow {envelope.workflow_id}',
+ extra={
+ 'workflow_id': envelope.workflow_id,
+ 'execution_id': envelope.execution_id,
+ 'node_id': envelope.node_id,
+ }
+ )
+
+ return result_envelope
+
+ except Exception as e:
+ self._logger.error(
+ f'Pipeline execution failed: {e}',
+ exc_info=True,
+ extra={
+ 'workflow_id': envelope.workflow_id,
+ 'execution_id': envelope.execution_id,
+ 'node_id': envelope.node_id,
+ }
+ )
+ raise
+
+ def validate_envelope(self, envelope: Any) -> bool:
+ """验证MessageEnvelope的有效性
+
+ Args:
+ envelope: MessageEnvelope - 要验证的消息信封
+
+ Returns:
+ bool - 验证是否通过
+ """
+ required_fields = [
+ 'message_id',
+ 'workflow_id',
+ 'node_id',
+ 'execution_id',
+ 'payload',
+ 'launcher_type',
+ ]
+
+ for field in required_fields:
+ if not hasattr(envelope, field):
+ self._logger.warning(
+ f'MessageEnvelope missing required field: {field}'
+ )
+ return False
+
+ return True
+
+ def get_responses(self) -> list[dict[str, Any]]:
+ """获取所有响应
+
+ Returns:
+ list - 响应列表
+ """
+ return self.responses.copy()
+
+ def add_response(self, response: dict[str, Any]) -> None:
+ """添加响应
+
+ Args:
+ response: dict - 响应数据
+ """
+ self.responses.append(response)
+
+ def get_last_text_response(self) -> str:
+ """获取最后一个文本响应
+
+ Returns:
+ str - 最后一个响应的文本内容
+ """
+ if not self.responses:
+ return ''
+
+ last_response = self.responses[-1]
+ return str(last_response.get('content', '') or '')
+
+ def clear_responses(self) -> None:
+ """清空所有响应"""
+ self.responses.clear()
+
+
+class WorkflowPipelineCompatibilityLayer:
+ """Workflow-Pipeline兼容性层
+
+ 提供向后兼容性,支持旧的Pipeline Query格式和新的MessageEnvelope格式。
+ """
+
+ def __init__(self):
+ """初始化兼容性层"""
+ self._logger = logging.getLogger(__name__)
+
+ def is_workflow_context(self, query: Any) -> bool:
+ """检查Query是否包含Workflow上下文
+
+ Args:
+ query: Query - Pipeline Query对象
+
+ Returns:
+ bool - 是否来自Workflow
+ """
+ if hasattr(query, 'is_from_workflow'):
+ return query.is_from_workflow()
+
+ if hasattr(query, 'get_workflow_context'):
+ context = query.get_workflow_context()
+ return bool(context and context.get('workflow_id'))
+
+ return False
+
+ def get_workflow_id(self, query: Any) -> Optional[str]:
+ """从Query获取Workflow ID
+
+ Args:
+ query: Query - Pipeline Query对象
+
+ Returns:
+ str - Workflow ID,如果不存在则返回None
+ """
+ if hasattr(query, 'get_workflow_id'):
+ return query.get_workflow_id()
+
+ if hasattr(query, 'get_workflow_context'):
+ context = query.get_workflow_context()
+ return context.get('workflow_id') if context else None
+
+ return None
+
+ def get_execution_id(self, query: Any) -> Optional[str]:
+ """从Query获取执行ID
+
+ Args:
+ query: Query - Pipeline Query对象
+
+ Returns:
+ str - 执行ID,如果不存在则返回None
+ """
+ if hasattr(query, 'get_execution_id'):
+ return query.get_execution_id()
+
+ if hasattr(query, 'get_workflow_context'):
+ context = query.get_workflow_context()
+ return context.get('execution_id') if context else None
+
+ return None
diff --git a/src/langbot/pkg/workflow/entities.py b/src/langbot/pkg/workflow/entities.py
index 44107a0a..bda59a27 100644
--- a/src/langbot/pkg/workflow/entities.py
+++ b/src/langbot/pkg/workflow/entities.py
@@ -1,12 +1,27 @@
-"""Workflow entities and data models"""
+"""Workflow entities and data models
+
+This module defines workflow entities using SDK standard entities where available,
+and local-specific entities for LangBot_copy-specific functionality.
+"""
from __future__ import annotations
-import enum
from datetime import datetime
from typing import Any, Optional
import pydantic
+# Import SDK entities for standard workflow protocol types
+from langbot_plugin.api.entities.builtin.workflow import (
+ ExecutionContext,
+ ExecutionStep,
+ ExecutionStatus,
+ MessageContext,
+ NodeDefinition,
+ NodeState,
+ NodeStatus,
+ PortDefinition,
+)
+
class Position(pydantic.BaseModel):
"""Node position on canvas"""
@@ -15,31 +30,6 @@ class Position(pydantic.BaseModel):
y: float = 0
-class PortDefinition(pydantic.BaseModel):
- """Node port definition"""
-
- name: str
- type: str = 'any' # any, string, number, boolean, object, array
- description: str = ''
- required: bool = True
-
-
-class NodeDefinition(pydantic.BaseModel):
- """Workflow node definition"""
-
- id: str
- type: str
- name: str = ''
- position: Position = Position()
- config: dict[str, Any] = {}
- inputs: list[PortDefinition] = []
- outputs: list[PortDefinition] = []
-
- # UI metadata
- description: str = ''
- comment: str = '' # User comment/annotation
-
-
class EdgeDefinition(pydantic.BaseModel):
"""Workflow edge definition (connection between nodes)"""
@@ -161,125 +151,3 @@ class WorkflowDefinition(pydantic.BaseModel):
# Source tracking (for imported workflows)
source: Optional[str] = None # dify, n8n, langflow, etc.
source_id: Optional[str] = None
-
-
-class ExecutionStatus(enum.Enum):
- """Workflow execution status"""
-
- PENDING = 'pending'
- RUNNING = 'running'
- WAITING = 'waiting'
- COMPLETED = 'completed'
- FAILED = 'failed'
- CANCELLED = 'cancelled'
-
-
-class NodeStatus(enum.Enum):
- """Node execution status"""
-
- PENDING = 'pending'
- RUNNING = 'running'
- COMPLETED = 'completed'
- FAILED = 'failed'
- SKIPPED = 'skipped'
-
-
-class NodeState(pydantic.BaseModel):
- """Runtime state of a node during execution"""
-
- node_id: str
- status: NodeStatus = NodeStatus.PENDING
- inputs: dict[str, Any] = {}
- outputs: dict[str, Any] = {}
- start_time: Optional[datetime] = None
- end_time: Optional[datetime] = None
- error: Optional[str] = None
- retry_count: int = 0
-
-
-class MessageContext(pydantic.BaseModel):
- """Message context for message-triggered workflows"""
-
- message_id: str
- message_content: str
- sender_id: str
- sender_name: str = ''
- platform: str = ''
- conversation_id: str = ''
- is_group: bool = False
- group_id: Optional[str] = None
- mentions: list[str] = []
- reply_to: Optional[str] = None
- raw_message: dict[str, Any] = {}
-
-
-class ExecutionStep(pydantic.BaseModel):
- """Execution history step"""
-
- timestamp: datetime
- node_id: str
- node_type: str
- status: str
- inputs: dict[str, Any] = {}
- outputs: dict[str, Any] = {}
- duration_ms: int = 0
- error: Optional[str] = None
-
-
-class ExecutionContext(pydantic.BaseModel):
- """Workflow execution context"""
-
- execution_id: str
- workflow_id: str
- workflow_version: int = 1
- status: ExecutionStatus = ExecutionStatus.PENDING
-
- # Runtime data
- variables: dict[str, Any] = {}
- conversation_variables: dict[str, Any] = {} # Session-level persistent variables
- node_states: dict[str, NodeState] = {}
- memory: dict[str, Any] = {} # Workflow memory for storing/retrieving data
-
- # Timing
- start_time: Optional[datetime] = None
- end_time: Optional[datetime] = None
-
- # Error
- error: Optional[str] = None
-
- # Message context (if triggered by message)
- message_context: Optional[MessageContext] = None
-
- # Trigger info
- trigger_type: Optional[str] = None
- trigger_data: dict[str, Any] = {}
-
- # Execution history
- history: list[ExecutionStep] = []
-
- # Session info
- session_id: Optional[str] = None
- user_id: Optional[str] = None
- bot_id: Optional[str] = None
-
- def get_node_output(self, node_id: str, output_name: str = 'output') -> Any:
- """Get output from a specific node"""
- if node_id in self.node_states:
- return self.node_states[node_id].outputs.get(output_name)
- return None
-
- def set_variable(self, name: str, value: Any):
- """Set a workflow variable"""
- self.variables[name] = value
-
- def get_variable(self, name: str, default: Any = None) -> Any:
- """Get a workflow variable"""
- return self.variables.get(name, default)
-
- def set_conversation_variable(self, name: str, value: Any):
- """Set a conversation-level variable (persisted across executions)"""
- self.conversation_variables[name] = value
-
- def get_conversation_variable(self, name: str, default: Any = None) -> Any:
- """Get a conversation-level variable"""
- return self.conversation_variables.get(name, default)
diff --git a/src/langbot/pkg/workflow/executor.py b/src/langbot/pkg/workflow/executor.py
index 63c24a35..a5dee55f 100644
--- a/src/langbot/pkg/workflow/executor.py
+++ b/src/langbot/pkg/workflow/executor.py
@@ -178,7 +178,7 @@ class WorkflowExecutor:
# Initialize node states
for node in workflow.nodes:
if node.id not in context.node_states:
- context.node_states[node.id] = NodeState(node_id=node.id)
+ context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING)
# Find start node(s)
if start_node_id:
@@ -662,14 +662,15 @@ class WorkflowExecutor:
duration_ms = int((node_state.end_time - node_state.start_time).total_seconds() * 1000)
step = ExecutionStep(
+ step_id=f"step_{uuid.uuid4().hex[:8]}",
timestamp=datetime.now(),
node_id=node.id,
node_type=node.type,
- status=node_state.status.value,
- inputs=node_state.inputs,
- outputs=node_state.outputs,
+ status=node_state.status,
duration_ms=duration_ms,
error=node_state.error,
+ inputs=node_state.inputs,
+ outputs=node_state.outputs,
)
context.history.append(step)
@@ -801,7 +802,7 @@ class LoopExecutor:
for node in loop_body:
# Reset node state for this iteration
- context.node_states[node.id] = NodeState(node_id=node.id)
+ context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING)
await self.executor._execute_node(node, context, max_retries=3)
@@ -823,5 +824,3 @@ class LoopExecutor:
context.variables.pop('loop_is_last', None)
return results
-
-
diff --git a/src/langbot/pkg/workflow/nodes/call_pipeline.py b/src/langbot/pkg/workflow/nodes/call_pipeline.py
index 6c1684df..4699bb6a 100644
--- a/src/langbot/pkg/workflow/nodes/call_pipeline.py
+++ b/src/langbot/pkg/workflow/nodes/call_pipeline.py
@@ -5,18 +5,61 @@ Node metadata is loaded from: ../../templates/metadata/nodes/call_pipeline.yaml
from __future__ import annotations
-from typing import Any
+from typing import Any, Optional
+
+import pydantic
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
+import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_event_logger
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
+
+class _NoOpEventLogger(abstract_event_logger.AbstractEventLogger):
+ """No-op event logger for workflow pipeline adapter."""
+
+ async def info(
+ self,
+ text: str,
+ images: Optional[list[platform_message.Image]] = None,
+ message_session_id: Optional[str] = None,
+ no_throw: bool = True,
+ ):
+ pass
+
+ async def debug(
+ self,
+ text: str,
+ images: Optional[list[platform_message.Image]] = None,
+ message_session_id: Optional[str] = None,
+ no_throw: bool = True,
+ ):
+ pass
+
+ async def warning(
+ self,
+ text: str,
+ images: Optional[list[platform_message.Image]] = None,
+ message_session_id: Optional[str] = None,
+ no_throw: bool = True,
+ ):
+ pass
+
+ async def error(
+ self,
+ text: str,
+ images: Optional[list[platform_message.Image]] = None,
+ message_session_id: Optional[str] = None,
+ no_throw: bool = True,
+ ):
+ pass
+
@workflow_node('call_pipeline')
class CallPipelineNode(WorkflowNode):
"""Call pipeline node - invoke an existing pipeline"""
@@ -139,10 +182,16 @@ class CallPipelineNode(WorkflowNode):
)
class _WorkflowPipelineCaptureAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
+ """Adapter to capture pipeline responses for workflow execution."""
+
+ class Config:
+ arbitrary_types_allowed = True
+
responses: list[dict[str, Any]] = []
+ context: ExecutionContext = pydantic.Field(exclude=True)
def __init__(self, context: ExecutionContext):
- super().__init__(config={}, logger=None)
+ super().__init__(config={}, logger=_NoOpEventLogger())
self.context = context
self.responses = []
diff --git a/src/langbot/pkg/workflow/nodes/code_executor.py b/src/langbot/pkg/workflow/nodes/code_executor.py
index af27998d..f51e2ab7 100644
--- a/src/langbot/pkg/workflow/nodes/code_executor.py
+++ b/src/langbot/pkg/workflow/nodes/code_executor.py
@@ -9,7 +9,7 @@ import json
import re
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('code_executor')
diff --git a/src/langbot/pkg/workflow/nodes/condition.py b/src/langbot/pkg/workflow/nodes/condition.py
index e50478a8..325dc37c 100644
--- a/src/langbot/pkg/workflow/nodes/condition.py
+++ b/src/langbot/pkg/workflow/nodes/condition.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
from ..safe_eval import safe_eval_with_vars
diff --git a/src/langbot/pkg/workflow/nodes/coze_bot.py b/src/langbot/pkg/workflow/nodes/coze_bot.py
index e086f157..e2389088 100644
--- a/src/langbot/pkg/workflow/nodes/coze_bot.py
+++ b/src/langbot/pkg/workflow/nodes/coze_bot.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('coze_bot')
diff --git a/src/langbot/pkg/workflow/nodes/cron_trigger.py b/src/langbot/pkg/workflow/nodes/cron_trigger.py
index 6a480d3c..13c76639 100644
--- a/src/langbot/pkg/workflow/nodes/cron_trigger.py
+++ b/src/langbot/pkg/workflow/nodes/cron_trigger.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('cron_trigger')
diff --git a/src/langbot/pkg/workflow/nodes/data_transform.py b/src/langbot/pkg/workflow/nodes/data_transform.py
index c5eec5d2..7445662c 100644
--- a/src/langbot/pkg/workflow/nodes/data_transform.py
+++ b/src/langbot/pkg/workflow/nodes/data_transform.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
from ..safe_eval import safe_eval_with_vars
diff --git a/src/langbot/pkg/workflow/nodes/database_query.py b/src/langbot/pkg/workflow/nodes/database_query.py
index 22c7e9d6..65f96bb1 100644
--- a/src/langbot/pkg/workflow/nodes/database_query.py
+++ b/src/langbot/pkg/workflow/nodes/database_query.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('database_query')
diff --git a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py
index 07f35c04..2d8ff8cf 100644
--- a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py
+++ b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('dify_knowledge_query')
diff --git a/src/langbot/pkg/workflow/nodes/dify_workflow.py b/src/langbot/pkg/workflow/nodes/dify_workflow.py
index 13477905..3e67442c 100644
--- a/src/langbot/pkg/workflow/nodes/dify_workflow.py
+++ b/src/langbot/pkg/workflow/nodes/dify_workflow.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('dify_workflow')
diff --git a/src/langbot/pkg/workflow/nodes/end.py b/src/langbot/pkg/workflow/nodes/end.py
index 2fdc8713..879023a3 100644
--- a/src/langbot/pkg/workflow/nodes/end.py
+++ b/src/langbot/pkg/workflow/nodes/end.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('end')
diff --git a/src/langbot/pkg/workflow/nodes/event_trigger.py b/src/langbot/pkg/workflow/nodes/event_trigger.py
index 2bb5004b..2e6da56a 100644
--- a/src/langbot/pkg/workflow/nodes/event_trigger.py
+++ b/src/langbot/pkg/workflow/nodes/event_trigger.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('event_trigger')
diff --git a/src/langbot/pkg/workflow/nodes/http_request.py b/src/langbot/pkg/workflow/nodes/http_request.py
index 004dfb36..3bb4f5f3 100644
--- a/src/langbot/pkg/workflow/nodes/http_request.py
+++ b/src/langbot/pkg/workflow/nodes/http_request.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('http_request')
diff --git a/src/langbot/pkg/workflow/nodes/iterator.py b/src/langbot/pkg/workflow/nodes/iterator.py
index 5bd7526c..fa33f04e 100644
--- a/src/langbot/pkg/workflow/nodes/iterator.py
+++ b/src/langbot/pkg/workflow/nodes/iterator.py
@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('iterator')
diff --git a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py
index 1cd14f85..46eafeb1 100644
--- a/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py
+++ b/src/langbot/pkg/workflow/nodes/knowledge_retrieval.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('knowledge_retrieval')
diff --git a/src/langbot/pkg/workflow/nodes/langflow_flow.py b/src/langbot/pkg/workflow/nodes/langflow_flow.py
index 9edc8290..e31c4b46 100644
--- a/src/langbot/pkg/workflow/nodes/langflow_flow.py
+++ b/src/langbot/pkg/workflow/nodes/langflow_flow.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('langflow_flow')
diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py
index 4ff5d81c..c04400b0 100644
--- a/src/langbot/pkg/workflow/nodes/llm_call.py
+++ b/src/langbot/pkg/workflow/nodes/llm_call.py
@@ -10,7 +10,7 @@ from typing import Any, AsyncGenerator
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
logger = logging.getLogger(__name__)
@@ -181,7 +181,6 @@ class LLMCallNode(WorkflowNode):
# Get error handling config
exception_handling = self.get_config('exception_handling', 'show-error')
failure_hint = self.get_config('failure_hint', 'Request failed.')
- remove_think = self.get_config('remove_think', False)
track_function_calls = self.get_config('track_function_calls', False)
# Get output format and json_schema config
@@ -192,11 +191,10 @@ class LLMCallNode(WorkflowNode):
enable_tools = self.get_config('enable_tools', False)
tools_config = self.get_config('tools', [])
- # Resolve prompts - handle null user_prompt_template
+ # Resolve prompts
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
- # Default to input if not set
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
@@ -229,14 +227,11 @@ class LLMCallNode(WorkflowNode):
if tool_names:
all_tools = await self.ap.tool_mgr.get_tools()
funcs = [t for t in all_tools if t.name in tool_names]
- if funcs:
- logger.info(f'LLM call node {self.node_id}: using tools: {[t.name for t in funcs]}')
except Exception as e:
- logger.warning(f'LLM call node {self.node_id}: failed to load tools - {e}')
+ logger.warning(f'[LLM:{self.node_id}] Failed to load tools: {e}')
funcs = None
- # Invoke LLM with error handling
- logger.info(f'LLM call node {self.node_id}: invoking model {model_uuid}')
+ # Invoke LLM
try:
result_message = await runtime_model.provider.invoke_llm(
query=None,
@@ -246,7 +241,7 @@ class LLMCallNode(WorkflowNode):
extra_args=extra_args,
)
except Exception as e:
- logger.warning(f'LLM call node {self.node_id}: request failed - {e}')
+ logger.warning(f'[LLM:{self.node_id}] LLM call failed: {e}')
# Handle based on exception handling strategy
if exception_handling == 'show-error':
@@ -284,21 +279,20 @@ class LLMCallNode(WorkflowNode):
elif isinstance(elem, str):
response_text += elem
- # Remove CoT (Chain of Thought) content if configured
- if remove_think:
- response_text = self._remove_think_content(response_text)
+ # Remove CoT content (always remove to avoid leaking internal reasoning)
+ response_text = self._remove_think_content(response_text)
# Apply content safety filter
response_text, is_blocked, filter_notice = self._apply_content_filter(response_text)
if is_blocked:
- logger.warning(f'LLM call node {self.node_id}: response blocked by content filter - {filter_notice}')
+ logger.warning(f'[LLM:{self.node_id}] Response blocked by content filter: {filter_notice}')
return {
'response': filter_notice,
'usage': usage,
'blocked_by_filter': True,
}
- # Extract usage info if available
+ # Extract usage info
usage = {
'prompt_tokens': 0,
'completion_tokens': 0,
@@ -319,6 +313,7 @@ class LLMCallNode(WorkflowNode):
'total_tokens': getattr(u, 'total_tokens', 0) or 0,
}
+ # Build result
result: dict[str, Any] = {
'response': response_text,
'usage': usage,
@@ -329,7 +324,7 @@ class LLMCallNode(WorkflowNode):
try:
result['parsed'] = json.loads(response_text)
except json.JSONDecodeError as e:
- logger.warning(f'LLM call node {self.node_id}: failed to parse JSON response - {e}')
+ logger.warning(f'[LLM:{self.node_id}] Failed to parse JSON: {e}')
result['parsed'] = None
result['parse_error'] = str(e)
@@ -354,7 +349,6 @@ class LLMCallNode(WorkflowNode):
if not self.ap:
raise RuntimeError('Application instance not available - cannot call LLM')
- remove_think = self.get_config('remove_think', False)
exception_handling = self.get_config('exception_handling', 'show-error')
failure_hint = self.get_config('failure_hint', 'Request failed.')
@@ -383,7 +377,7 @@ class LLMCallNode(WorkflowNode):
if max_tokens and int(max_tokens) > 0:
extra_args['max_tokens'] = int(max_tokens)
- logger.info(f'LLM call node {self.node_id}: streaming model {model_uuid}')
+ logger.info(f'[LLM:{self.node_id}] Streaming model {model_uuid}')
try:
# Try streaming first
@@ -396,6 +390,7 @@ class LLMCallNode(WorkflowNode):
)
full_response = ''
+ in_think_block = False
async for chunk in stream:
chunk_text = ''
if hasattr(chunk, 'content'):
@@ -409,16 +404,25 @@ class LLMCallNode(WorkflowNode):
chunk_text += elem
if chunk_text:
- if remove_think:
- chunk_text = self._remove_think_content(chunk_text)
- full_response += chunk_text
- yield chunk_text
+ # Filter blocks in streaming mode
+ if '' in chunk_text or '' in chunk_text:
+ in_think_block = True
+ if in_think_block:
+ if '' in chunk_text or '' in chunk_text:
+ in_think_block = False
+ chunk_text = chunk_text.split('')[-1].split('')[-1]
+ else:
+ chunk_text = ''
+
+ if chunk_text:
+ full_response += chunk_text
+ yield chunk_text
# Store in context for downstream nodes
context.variables['_last_llm_response'] = full_response
except Exception as e:
- logger.warning(f'LLM call node {self.node_id}: streaming failed, falling back - {e}')
+ logger.warning(f'[LLM:{self.node_id}] Streaming failed, falling back - {e}')
# Fallback to non-streaming
try:
result_message = await runtime_model.provider.invoke_llm(
@@ -429,12 +433,12 @@ class LLMCallNode(WorkflowNode):
extra_args=extra_args,
)
response_text = self._extract_response_text(result_message)
- if remove_think:
- response_text = self._remove_think_content(response_text)
+ # Always remove content in fallback
+ response_text = self._remove_think_content(response_text)
yield response_text
context.variables['_last_llm_response'] = response_text
except Exception as e2:
- logger.error(f'LLM call node {self.node_id}: fallback also failed - {e2}')
+ logger.error(f'[LLM:{self.node_id}] Fallback also failed - {e2}')
if exception_handling == 'show-hint':
yield failure_hint
elif exception_handling != 'hide':
diff --git a/src/langbot/pkg/workflow/nodes/loop.py b/src/langbot/pkg/workflow/nodes/loop.py
index f33ed72a..18cfcb2f 100644
--- a/src/langbot/pkg/workflow/nodes/loop.py
+++ b/src/langbot/pkg/workflow/nodes/loop.py
@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('loop')
diff --git a/src/langbot/pkg/workflow/nodes/mcp_tool.py b/src/langbot/pkg/workflow/nodes/mcp_tool.py
index 60e22734..ee4e3547 100644
--- a/src/langbot/pkg/workflow/nodes/mcp_tool.py
+++ b/src/langbot/pkg/workflow/nodes/mcp_tool.py
@@ -11,7 +11,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('mcp_tool')
diff --git a/src/langbot/pkg/workflow/nodes/memory_store.py b/src/langbot/pkg/workflow/nodes/memory_store.py
index 35edc658..8cc2b110 100644
--- a/src/langbot/pkg/workflow/nodes/memory_store.py
+++ b/src/langbot/pkg/workflow/nodes/memory_store.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
class MemoryHelper:
diff --git a/src/langbot/pkg/workflow/nodes/merge.py b/src/langbot/pkg/workflow/nodes/merge.py
index 83b677a3..483f3701 100644
--- a/src/langbot/pkg/workflow/nodes/merge.py
+++ b/src/langbot/pkg/workflow/nodes/merge.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('merge')
diff --git a/src/langbot/pkg/workflow/nodes/message_trigger.py b/src/langbot/pkg/workflow/nodes/message_trigger.py
index ee07cc2c..fe29b007 100644
--- a/src/langbot/pkg/workflow/nodes/message_trigger.py
+++ b/src/langbot/pkg/workflow/nodes/message_trigger.py
@@ -9,7 +9,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('message_trigger')
diff --git a/src/langbot/pkg/workflow/nodes/n8n_workflow.py b/src/langbot/pkg/workflow/nodes/n8n_workflow.py
index e7c5878b..2137b192 100644
--- a/src/langbot/pkg/workflow/nodes/n8n_workflow.py
+++ b/src/langbot/pkg/workflow/nodes/n8n_workflow.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('n8n_workflow')
diff --git a/src/langbot/pkg/workflow/nodes/opening_statement.py b/src/langbot/pkg/workflow/nodes/opening_statement.py
index 2cb8b30b..20e50008 100644
--- a/src/langbot/pkg/workflow/nodes/opening_statement.py
+++ b/src/langbot/pkg/workflow/nodes/opening_statement.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('opening_statement')
diff --git a/src/langbot/pkg/workflow/nodes/parallel.py b/src/langbot/pkg/workflow/nodes/parallel.py
index 3eb970ff..0cb2eeec 100644
--- a/src/langbot/pkg/workflow/nodes/parallel.py
+++ b/src/langbot/pkg/workflow/nodes/parallel.py
@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('parallel')
diff --git a/src/langbot/pkg/workflow/nodes/parameter_extractor.py b/src/langbot/pkg/workflow/nodes/parameter_extractor.py
index 4b383a58..75c3e6c4 100644
--- a/src/langbot/pkg/workflow/nodes/parameter_extractor.py
+++ b/src/langbot/pkg/workflow/nodes/parameter_extractor.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('parameter_extractor')
diff --git a/src/langbot/pkg/workflow/nodes/plugin_call.py b/src/langbot/pkg/workflow/nodes/plugin_call.py
index a420c60a..428f333a 100644
--- a/src/langbot/pkg/workflow/nodes/plugin_call.py
+++ b/src/langbot/pkg/workflow/nodes/plugin_call.py
@@ -7,7 +7,7 @@
# from typing import Any
-# from ..entities import ExecutionContext
+# from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
# from ..node import WorkflowNode, workflow_node
# @workflow_node('plugin_call')
diff --git a/src/langbot/pkg/workflow/nodes/question_classifier.py b/src/langbot/pkg/workflow/nodes/question_classifier.py
index fc3fe6f8..31a09df1 100644
--- a/src/langbot/pkg/workflow/nodes/question_classifier.py
+++ b/src/langbot/pkg/workflow/nodes/question_classifier.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('question_classifier')
diff --git a/src/langbot/pkg/workflow/nodes/redis_operation.py b/src/langbot/pkg/workflow/nodes/redis_operation.py
index 8c74510f..6e4b4eab 100644
--- a/src/langbot/pkg/workflow/nodes/redis_operation.py
+++ b/src/langbot/pkg/workflow/nodes/redis_operation.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('redis_operation')
diff --git a/src/langbot/pkg/workflow/nodes/reply_message.py b/src/langbot/pkg/workflow/nodes/reply_message.py
index 223fbc20..1633cfe6 100644
--- a/src/langbot/pkg/workflow/nodes/reply_message.py
+++ b/src/langbot/pkg/workflow/nodes/reply_message.py
@@ -8,7 +8,7 @@ from __future__ import annotations
import logging
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
logger = logging.getLogger(__name__)
@@ -20,13 +20,14 @@ class ReplyMessageNode(WorkflowNode):
category = 'action'
async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]:
- message = inputs.get('message')
- if message in (None, ''):
- message = inputs.get('input')
+ # Priority: content/response (from upstream nodes) > message (original input) > context
+ message = inputs.get('content')
if message in (None, ''):
message = inputs.get('response')
if message in (None, ''):
- message = inputs.get('content')
+ message = inputs.get('input')
+ if message in (None, ''):
+ message = inputs.get('message')
if message in (None, '') and context.message_context:
message = context.message_context.message_content
if message is None:
diff --git a/src/langbot/pkg/workflow/nodes/send_message.py b/src/langbot/pkg/workflow/nodes/send_message.py
index 3c2237a1..4989615f 100644
--- a/src/langbot/pkg/workflow/nodes/send_message.py
+++ b/src/langbot/pkg/workflow/nodes/send_message.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('send_message')
diff --git a/src/langbot/pkg/workflow/nodes/set_variable.py b/src/langbot/pkg/workflow/nodes/set_variable.py
index 50031656..bee36e36 100644
--- a/src/langbot/pkg/workflow/nodes/set_variable.py
+++ b/src/langbot/pkg/workflow/nodes/set_variable.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('set_variable')
diff --git a/src/langbot/pkg/workflow/nodes/store_data.py b/src/langbot/pkg/workflow/nodes/store_data.py
index 01d1ff90..7eb20e0b 100644
--- a/src/langbot/pkg/workflow/nodes/store_data.py
+++ b/src/langbot/pkg/workflow/nodes/store_data.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('store_data')
diff --git a/src/langbot/pkg/workflow/nodes/switch.py b/src/langbot/pkg/workflow/nodes/switch.py
index 22f0c674..8d196647 100644
--- a/src/langbot/pkg/workflow/nodes/switch.py
+++ b/src/langbot/pkg/workflow/nodes/switch.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('switch')
diff --git a/src/langbot/pkg/workflow/nodes/variable_aggregator.py b/src/langbot/pkg/workflow/nodes/variable_aggregator.py
index 5b5fe821..e9cfbce8 100644
--- a/src/langbot/pkg/workflow/nodes/variable_aggregator.py
+++ b/src/langbot/pkg/workflow/nodes/variable_aggregator.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('variable_aggregator')
diff --git a/src/langbot/pkg/workflow/nodes/wait.py b/src/langbot/pkg/workflow/nodes/wait.py
index 08844087..465add98 100644
--- a/src/langbot/pkg/workflow/nodes/wait.py
+++ b/src/langbot/pkg/workflow/nodes/wait.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('wait')
diff --git a/src/langbot/pkg/workflow/nodes/webhook_trigger.py b/src/langbot/pkg/workflow/nodes/webhook_trigger.py
index af95c598..64c04ef3 100644
--- a/src/langbot/pkg/workflow/nodes/webhook_trigger.py
+++ b/src/langbot/pkg/workflow/nodes/webhook_trigger.py
@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
-from ..entities import ExecutionContext
+from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('webhook_trigger')
diff --git a/src/langbot/pkg/workflow/variable_manager.py b/src/langbot/pkg/workflow/variable_manager.py
new file mode 100644
index 00000000..b005ccf3
--- /dev/null
+++ b/src/langbot/pkg/workflow/variable_manager.py
@@ -0,0 +1,185 @@
+"""Variable management for workflow execution.
+
+This module provides utilities for managing workflow variables, including:
+- Reserved variable protection
+- Variable namespace validation
+- Variable inheritance between nodes
+"""
+
+from typing import Set, Dict, Any
+
+# 保留变量列表 - 这些变量由系统管理,不能被用户覆盖
+RESERVED_VARIABLES = {
+ 'workflow_id',
+ 'execution_id',
+ 'node_id',
+ 'timestamp',
+ 'launcher_type',
+ 'trigger_type',
+ 'message_id',
+ 'execution_status',
+}
+
+
+def get_reserved_variables() -> Set[str]:
+ """获取保留变量列表
+
+ Returns:
+ 保留变量名称的集合
+ """
+ return RESERVED_VARIABLES.copy()
+
+
+def validate_variable_namespace(
+ variables: Dict[str, Any],
+ namespace: str = 'workflow'
+) -> bool:
+ """验证变量命名空间
+
+ 检查变量是否使用了保留的名称。允许不带前缀的变量,但建议使用命名空间前缀。
+
+ Args:
+ variables: 要验证的变量字典
+ namespace: 命名空间前缀(可选)
+
+ Returns:
+ True 如果验证通过
+
+ Raises:
+ ValueError: 如果变量名称与保留变量冲突
+ """
+ reserved = get_reserved_variables()
+
+ for var_name in variables.keys():
+ if var_name in reserved:
+ raise ValueError(
+ f"Variable '{var_name}' is reserved and cannot be used. "
+ f"Reserved variables: {', '.join(sorted(reserved))}"
+ )
+
+ # 检查命名空间前缀(可选建议)
+ if namespace and not var_name.startswith(f"{namespace}_"):
+ # 允许不带前缀的变量,但建议使用前缀
+ pass
+
+ return True
+
+
+def inherit_variables(
+ parent_variables: Dict[str, Any],
+ child_namespace: str
+) -> Dict[str, Any]:
+ """继承父节点变量到子节点
+
+ 将父节点的变量继承到子节点,跳过保留变量,并添加命名空间前缀。
+
+ Args:
+ parent_variables: 父节点的变量字典
+ child_namespace: 子节点的命名空间前缀
+
+ Returns:
+ 带有命名空间前缀的继承变量字典
+ """
+ inherited = {}
+
+ for key, value in parent_variables.items():
+ # 跳过保留变量
+ if key in RESERVED_VARIABLES:
+ continue
+
+ # 添加命名空间前缀
+ namespaced_key = f"{child_namespace}_{key}"
+ inherited[namespaced_key] = value
+
+ return inherited
+
+
+def merge_variables(
+ base_variables: Dict[str, Any],
+ override_variables: Dict[str, Any],
+ allow_reserved: bool = False
+) -> Dict[str, Any]:
+ """合并变量字典
+
+ 将override_variables合并到base_variables中。
+
+ Args:
+ base_variables: 基础变量字典
+ override_variables: 要合并的变量字典
+ allow_reserved: 是否允许覆盖保留变量(默认False)
+
+ Returns:
+ 合并后的变量字典
+
+ Raises:
+ ValueError: 如果尝试覆盖保留变量且allow_reserved=False
+ """
+ result = base_variables.copy()
+
+ for key, value in override_variables.items():
+ if key in RESERVED_VARIABLES and not allow_reserved:
+ raise ValueError(
+ f"Cannot override reserved variable '{key}'. "
+ f"Set allow_reserved=True to override."
+ )
+ result[key] = value
+
+ return result
+
+
+def extract_namespace_variables(
+ variables: Dict[str, Any],
+ namespace: str
+) -> Dict[str, Any]:
+ """提取特定命名空间的变量
+
+ 从变量字典中提取具有特定命名空间前缀的变量。
+
+ Args:
+ variables: 变量字典
+ namespace: 命名空间前缀
+
+ Returns:
+ 提取的变量字典(不包含命名空间前缀)
+ """
+ prefix = f"{namespace}_"
+ result = {}
+
+ for key, value in variables.items():
+ if key.startswith(prefix):
+ # 移除命名空间前缀
+ clean_key = key[len(prefix):]
+ result[clean_key] = value
+
+ return result
+
+
+def sanitize_variables(
+ variables: Dict[str, Any],
+ allowed_keys: Set[str] | None = None
+) -> Dict[str, Any]:
+ """清理变量字典
+
+ 移除保留变量和不在允许列表中的变量。
+
+ Args:
+ variables: 要清理的变量字典
+ allowed_keys: 允许的键集合(如果为None,则允许所有非保留键)
+
+ Returns:
+ 清理后的变量字典
+ """
+ result = {}
+
+ for key, value in variables.items():
+ # 跳过保留变量
+ if key in RESERVED_VARIABLES:
+ continue
+
+ # 检查允许列表
+ if allowed_keys is not None and key not in allowed_keys:
+ continue
+
+ result[key] = value
+
+ return result
diff --git a/uv.lock b/uv.lock
index fc56bbbc..72145a33 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1973,7 +1973,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
- { name = "langbot-plugin", specifier = "==0.3.11" },
+ { name = "langbot-plugin", directory = "/home/typer/Desktop/langbot-plugin-sdk" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-core", specifier = ">=1.2.28" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
@@ -2037,7 +2037,7 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.3.11"
-source = { registry = "https://pypi.org/simple" }
+source = { directory = "/home/typer/Desktop/langbot-plugin-sdk" }
dependencies = [
{ name = "aiofiles" },
{ name = "dotenv" },
@@ -2054,9 +2054,31 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/91/83/93b86bcdbfe51d820fa59232aaa73cc802d6ce614f67d8f8b33957419538/langbot_plugin-0.3.11.tar.gz", hash = "sha256:8d10c98c771b468b2d35cc007778439c39922a88265fcc16a5881234bc7c1b19", size = 190315, upload-time = "2026-05-12T15:45:24.262Z" }
-wheels = [
- { url = "https://files.pythonhosted.org/packages/8f/22/de7977a6a5cbf557b80043eb3ed39e5feff24033a5d6db4ab88d48ccb6ea/langbot_plugin-0.3.11-py3-none-any.whl", hash = "sha256:c1d2e84eda1584902d99efa316b850c08c1c04fcc199306ff4af1dca1431304a", size = 165574, upload-time = "2026-05-12T15:45:22.908Z" },
+
+[package.metadata]
+requires-dist = [
+ { name = "aiofiles", specifier = ">=24.1.0" },
+ { name = "dotenv", specifier = ">=0.9.9" },
+ { name = "httpx", specifier = ">=0.28.1" },
+ { name = "jinja2", specifier = ">=3.1.6" },
+ { name = "pip", specifier = ">=25.2" },
+ { name = "pydantic", specifier = ">=2.11.5" },
+ { name = "pydantic-settings", specifier = ">=2.10.1" },
+ { name = "pytest", specifier = ">=8.4.0" },
+ { name = "pyyaml", specifier = ">=6.0.2" },
+ { name = "textual", specifier = ">=3.2.0" },
+ { name = "types-aiofiles", specifier = ">=24.1.0.20250516" },
+ { name = "types-pyyaml", specifier = ">=6.0.12.20250516" },
+ { name = "watchdog", specifier = ">=6.0.0" },
+ { name = "websockets", specifier = ">=15.0.1" },
+]
+
+[package.metadata.requires-dev]
+dev = [
+ { name = "mypy", specifier = ">=1.16.0" },
+ { name = "pytest-asyncio", specifier = ">=1.3.0" },
+ { name = "pytest-cov", specifier = ">=7.0.0" },
+ { name = "ruff", specifier = ">=0.11.12" },
]
[[package]]
@@ -2751,7 +2773,7 @@ wheels = [
[[package]]
name = "moto"
version = "5.2.1"
-source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
+source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "boto3" },
{ name = "botocore" },
@@ -2761,9 +2783,9 @@ dependencies = [
{ name = "werkzeug" },
{ name = "xmltodict" },
]
-sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" }
wheels = [
- { url = "https://pypi.tuna.tsinghua.edu.cn/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" },
+ { url = "https://files.pythonhosted.org/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" },
]
[[package]]
@@ -4767,15 +4789,15 @@ wheels = [
[[package]]
name = "responses"
version = "0.26.0"
-source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
+source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pyyaml" },
{ name = "requests" },
{ name = "urllib3" },
]
-sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" }
wheels = [
- { url = "https://pypi.tuna.tsinghua.edu.cn/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" },
+ { url = "https://files.pythonhosted.org/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" },
]
[[package]]
@@ -6072,10 +6094,10 @@ wheels = [
[[package]]
name = "xmltodict"
version = "1.0.4"
-source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
-sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" }
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" }
wheels = [
- { url = "https://pypi.tuna.tsinghua.edu.cn/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" },
+ { url = "https://files.pythonhosted.org/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" },
]
[[package]]