This commit is contained in:
Typer_Body
2026-05-22 02:07:48 +08:00
parent f99d3022e8
commit 253cc6cbea
42 changed files with 727 additions and 237 deletions

View File

@@ -312,6 +312,164 @@ class WorkflowsRouterGroup(group.RouterGroup):
except ValueError as e:
return self.http_status(404, -1, str(e))
# LLM Node Performance Test Endpoint
# Tests each step of LLM node execution with detailed timing
@self.route('/_/test/llm-node', methods=['POST'], auth_type=group.AuthType.USER_TOKEN_OR_API_KEY)
async def _() -> str:
"""Test LLM node performance with detailed step-by-step timing.
Request body:
{
"model_uuid": "uuid-of-model",
"system_prompt": "optional system prompt",
"user_prompt": "test message",
"enable_tools": false,
"tools": [],
"temperature": 0.7,
"max_tokens": 100
}
Response includes timing for each step:
- model_fetch: Time to get model from model_mgr
- tool_fetch: Time to load tools (if enabled)
- prompt_build: Time to build messages
- llm_call: Time for actual LLM invocation
- total: Total time
"""
import time
json_data = await quart.request.json
if not json_data:
return self.http_status(400, -1, 'Request body is required')
model_uuid = json_data.get('model_uuid', '')
if not model_uuid:
return self.http_status(400, -1, 'model_uuid is required')
user_prompt = json_data.get('user_prompt', 'test')
system_prompt = json_data.get('system_prompt', '')
enable_tools = json_data.get('enable_tools', False)
tools_config = json_data.get('tools', [])
temperature = json_data.get('temperature')
max_tokens = json_data.get('max_tokens', 0)
timings = {}
errors = []
# Step 1: Model fetch
t_start = time.perf_counter()
try:
runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
timings['model_found'] = True
timings['model_name'] = runtime_model.model_entity.name if runtime_model else None
except Exception as e:
timings['model_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
timings['model_found'] = False
errors.append(f'Model fetch failed: {str(e)}')
return self.http_status(400, -1, {
'error': errors[0],
'timings': timings,
})
# Step 2: Tool fetch (if enabled)
timings['tool_fetch_ms'] = 0
timings['tools_loaded'] = 0
if enable_tools and tools_config:
t_start = time.perf_counter()
try:
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
all_tools = await self.ap.tool_mgr.get_tools()
tool_names = tools_config if isinstance(tools_config, list) else []
funcs = [t for t in all_tools if t.name in tool_names]
timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
timings['tools_loaded'] = len(funcs)
timings['tool_names'] = [t.name for t in funcs]
except Exception as e:
timings['tool_fetch_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
errors.append(f'Tool fetch failed: {str(e)}')
# Step 3: Build messages
t_start = time.perf_counter()
import langbot_plugin.api.entities.builtin.provider.message as provider_message
messages = []
if system_prompt:
messages.append(provider_message.Message(role='system', content=system_prompt))
messages.append(provider_message.Message(role='user', content=user_prompt))
timings['prompt_build_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
# Step 4: Build extra args
extra_args = {}
if temperature is not None:
extra_args['temperature'] = float(temperature)
if max_tokens and int(max_tokens) > 0:
extra_args['max_tokens'] = int(max_tokens)
# Step 5: LLM call
t_start = time.perf_counter()
try:
result_message = await runtime_model.provider.invoke_llm(
query=None,
model=runtime_model,
messages=messages,
funcs=funcs if enable_tools else None,
extra_args=extra_args,
)
timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
timings['llm_call_success'] = True
# Extract response text
response_text = ''
if isinstance(result_message.content, str):
response_text = result_message.content
elif isinstance(result_message.content, list):
for elem in result_message.content:
if hasattr(elem, 'text') and elem.text:
response_text += elem.text
elif isinstance(elem, str):
response_text += elem
timings['response_length'] = len(response_text)
timings['response_preview'] = response_text[:200]
# Extract usage
usage = {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
if hasattr(result_message, 'usage') and result_message.usage:
u = result_message.usage
usage = {
'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0,
'completion_tokens': getattr(u, 'completion_tokens', 0) or 0,
'total_tokens': getattr(u, 'total_tokens', 0) or 0,
}
timings['usage'] = usage
except Exception as e:
timings['llm_call_ms'] = round((time.perf_counter() - t_start) * 1000, 2)
timings['llm_call_success'] = False
errors.append(f'LLM call failed: {str(e)}')
# Calculate total
timings['total_ms'] = round(sum([
timings.get('model_fetch_ms', 0),
timings.get('tool_fetch_ms', 0),
timings.get('prompt_build_ms', 0),
timings.get('llm_call_ms', 0),
]), 2)
# Add breakdown percentage
if timings['total_ms'] > 0:
timings['breakdown'] = {
'model_fetch_pct': round(timings.get('model_fetch_ms', 0) / timings['total_ms'] * 100, 1),
'tool_fetch_pct': round(timings.get('tool_fetch_ms', 0) / timings['total_ms'] * 100, 1),
'prompt_build_pct': round(timings.get('prompt_build_ms', 0) / timings['total_ms'] * 100, 1),
'llm_call_pct': round(timings.get('llm_call_ms', 0) / timings['total_ms'] * 100, 1),
}
if errors:
timings['errors'] = errors
return self.success(data={'test_result': timings})
@group.group_class('executions', '/api/v1/executions')
class ExecutionsRouterGroup(group.RouterGroup):

View File

@@ -0,0 +1,204 @@
"""Workflow-Pipeline通信适配器
这个模块提供了Workflow和Pipeline之间的通信适配使用SDK标准的MessageEnvelope格式。
"""
from __future__ import annotations
import logging
from typing import Any, Optional
logger = logging.getLogger(__name__)
class _WorkflowPipelineCaptureAdapter:
"""Workflow-Pipeline通信适配器
用于在Workflow节点和Pipeline之间进行标准化的消息传递。
支持MessageEnvelope格式的双向转换。
"""
def __init__(self, context: Any):
"""初始化适配器
Args:
context: ExecutionContext - Workflow执行上下文
"""
self.context = context
self.responses: list[dict[str, Any]] = []
self.bot_account_id: Optional[str] = None
self._logger = logging.getLogger(__name__)
async def call_pipeline_with_envelope(
self,
envelope: Any,
pipeline_executor: Any
) -> Any:
"""使用MessageEnvelope调用Pipeline
Args:
envelope: MessageEnvelope - 标准消息信封
pipeline_executor: Pipeline执行器实例
Returns:
MessageEnvelope - 执行结果信封
"""
try:
# 动态导入以避免循环依赖
from langbot_plugin_sdk.workflow import envelope_to_query, query_to_envelope
# 1. 转换为Query
query = envelope_to_query(envelope)
# 2. 调用Pipeline
result_query = await pipeline_executor.execute(query)
# 3. 转换回Envelope
result_envelope = query_to_envelope(result_query, envelope)
self._logger.debug(
f'Pipeline execution completed for workflow {envelope.workflow_id}',
extra={
'workflow_id': envelope.workflow_id,
'execution_id': envelope.execution_id,
'node_id': envelope.node_id,
}
)
return result_envelope
except Exception as e:
self._logger.error(
f'Pipeline execution failed: {e}',
exc_info=True,
extra={
'workflow_id': envelope.workflow_id,
'execution_id': envelope.execution_id,
'node_id': envelope.node_id,
}
)
raise
def validate_envelope(self, envelope: Any) -> bool:
"""验证MessageEnvelope的有效性
Args:
envelope: MessageEnvelope - 要验证的消息信封
Returns:
bool - 验证是否通过
"""
required_fields = [
'message_id',
'workflow_id',
'node_id',
'execution_id',
'payload',
'launcher_type',
]
for field in required_fields:
if not hasattr(envelope, field):
self._logger.warning(
f'MessageEnvelope missing required field: {field}'
)
return False
return True
def get_responses(self) -> list[dict[str, Any]]:
"""获取所有响应
Returns:
list - 响应列表
"""
return self.responses.copy()
def add_response(self, response: dict[str, Any]) -> None:
"""添加响应
Args:
response: dict - 响应数据
"""
self.responses.append(response)
def get_last_text_response(self) -> str:
"""获取最后一个文本响应
Returns:
str - 最后一个响应的文本内容
"""
if not self.responses:
return ''
last_response = self.responses[-1]
return str(last_response.get('content', '') or '')
def clear_responses(self) -> None:
"""清空所有响应"""
self.responses.clear()
class WorkflowPipelineCompatibilityLayer:
"""Workflow-Pipeline兼容性层
提供向后兼容性支持旧的Pipeline Query格式和新的MessageEnvelope格式。
"""
def __init__(self):
"""初始化兼容性层"""
self._logger = logging.getLogger(__name__)
def is_workflow_context(self, query: Any) -> bool:
"""检查Query是否包含Workflow上下文
Args:
query: Query - Pipeline Query对象
Returns:
bool - 是否来自Workflow
"""
if hasattr(query, 'is_from_workflow'):
return query.is_from_workflow()
if hasattr(query, 'get_workflow_context'):
context = query.get_workflow_context()
return bool(context and context.get('workflow_id'))
return False
def get_workflow_id(self, query: Any) -> Optional[str]:
"""从Query获取Workflow ID
Args:
query: Query - Pipeline Query对象
Returns:
str - Workflow ID如果不存在则返回None
"""
if hasattr(query, 'get_workflow_id'):
return query.get_workflow_id()
if hasattr(query, 'get_workflow_context'):
context = query.get_workflow_context()
return context.get('workflow_id') if context else None
return None
def get_execution_id(self, query: Any) -> Optional[str]:
"""从Query获取执行ID
Args:
query: Query - Pipeline Query对象
Returns:
str - 执行ID如果不存在则返回None
"""
if hasattr(query, 'get_execution_id'):
return query.get_execution_id()
if hasattr(query, 'get_workflow_context'):
context = query.get_workflow_context()
return context.get('execution_id') if context else None
return None

View File

@@ -1,12 +1,27 @@
"""Workflow entities and data models"""
"""Workflow entities and data models
This module defines workflow entities using SDK standard entities where available,
and local-specific entities for LangBot_copy-specific functionality.
"""
from __future__ import annotations
import enum
from datetime import datetime
from typing import Any, Optional
import pydantic
# Import SDK entities for standard workflow protocol types
from langbot_plugin.api.entities.builtin.workflow import (
ExecutionContext,
ExecutionStep,
ExecutionStatus,
MessageContext,
NodeDefinition,
NodeState,
NodeStatus,
PortDefinition,
)
class Position(pydantic.BaseModel):
"""Node position on canvas"""
@@ -15,31 +30,6 @@ class Position(pydantic.BaseModel):
y: float = 0
class PortDefinition(pydantic.BaseModel):
"""Node port definition"""
name: str
type: str = 'any' # any, string, number, boolean, object, array
description: str = ''
required: bool = True
class NodeDefinition(pydantic.BaseModel):
"""Workflow node definition"""
id: str
type: str
name: str = ''
position: Position = Position()
config: dict[str, Any] = {}
inputs: list[PortDefinition] = []
outputs: list[PortDefinition] = []
# UI metadata
description: str = ''
comment: str = '' # User comment/annotation
class EdgeDefinition(pydantic.BaseModel):
"""Workflow edge definition (connection between nodes)"""
@@ -161,125 +151,3 @@ class WorkflowDefinition(pydantic.BaseModel):
# Source tracking (for imported workflows)
source: Optional[str] = None # dify, n8n, langflow, etc.
source_id: Optional[str] = None
class ExecutionStatus(enum.Enum):
"""Workflow execution status"""
PENDING = 'pending'
RUNNING = 'running'
WAITING = 'waiting'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
class NodeStatus(enum.Enum):
"""Node execution status"""
PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
SKIPPED = 'skipped'
class NodeState(pydantic.BaseModel):
"""Runtime state of a node during execution"""
node_id: str
status: NodeStatus = NodeStatus.PENDING
inputs: dict[str, Any] = {}
outputs: dict[str, Any] = {}
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
class MessageContext(pydantic.BaseModel):
"""Message context for message-triggered workflows"""
message_id: str
message_content: str
sender_id: str
sender_name: str = ''
platform: str = ''
conversation_id: str = ''
is_group: bool = False
group_id: Optional[str] = None
mentions: list[str] = []
reply_to: Optional[str] = None
raw_message: dict[str, Any] = {}
class ExecutionStep(pydantic.BaseModel):
"""Execution history step"""
timestamp: datetime
node_id: str
node_type: str
status: str
inputs: dict[str, Any] = {}
outputs: dict[str, Any] = {}
duration_ms: int = 0
error: Optional[str] = None
class ExecutionContext(pydantic.BaseModel):
"""Workflow execution context"""
execution_id: str
workflow_id: str
workflow_version: int = 1
status: ExecutionStatus = ExecutionStatus.PENDING
# Runtime data
variables: dict[str, Any] = {}
conversation_variables: dict[str, Any] = {} # Session-level persistent variables
node_states: dict[str, NodeState] = {}
memory: dict[str, Any] = {} # Workflow memory for storing/retrieving data
# Timing
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
# Error
error: Optional[str] = None
# Message context (if triggered by message)
message_context: Optional[MessageContext] = None
# Trigger info
trigger_type: Optional[str] = None
trigger_data: dict[str, Any] = {}
# Execution history
history: list[ExecutionStep] = []
# Session info
session_id: Optional[str] = None
user_id: Optional[str] = None
bot_id: Optional[str] = None
def get_node_output(self, node_id: str, output_name: str = 'output') -> Any:
"""Get output from a specific node"""
if node_id in self.node_states:
return self.node_states[node_id].outputs.get(output_name)
return None
def set_variable(self, name: str, value: Any):
"""Set a workflow variable"""
self.variables[name] = value
def get_variable(self, name: str, default: Any = None) -> Any:
"""Get a workflow variable"""
return self.variables.get(name, default)
def set_conversation_variable(self, name: str, value: Any):
"""Set a conversation-level variable (persisted across executions)"""
self.conversation_variables[name] = value
def get_conversation_variable(self, name: str, default: Any = None) -> Any:
"""Get a conversation-level variable"""
return self.conversation_variables.get(name, default)

View File

@@ -178,7 +178,7 @@ class WorkflowExecutor:
# Initialize node states
for node in workflow.nodes:
if node.id not in context.node_states:
context.node_states[node.id] = NodeState(node_id=node.id)
context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING)
# Find start node(s)
if start_node_id:
@@ -662,14 +662,15 @@ class WorkflowExecutor:
duration_ms = int((node_state.end_time - node_state.start_time).total_seconds() * 1000)
step = ExecutionStep(
step_id=f"step_{uuid.uuid4().hex[:8]}",
timestamp=datetime.now(),
node_id=node.id,
node_type=node.type,
status=node_state.status.value,
inputs=node_state.inputs,
outputs=node_state.outputs,
status=node_state.status,
duration_ms=duration_ms,
error=node_state.error,
inputs=node_state.inputs,
outputs=node_state.outputs,
)
context.history.append(step)
@@ -801,7 +802,7 @@ class LoopExecutor:
for node in loop_body:
# Reset node state for this iteration
context.node_states[node.id] = NodeState(node_id=node.id)
context.node_states[node.id] = NodeState(node_id=node.id, node_type=node.type, status=NodeStatus.PENDING)
await self.executor._execute_node(node, context, max_retries=3)
@@ -823,5 +824,3 @@ class LoopExecutor:
context.variables.pop('loop_is_last', None)
return results

View File

@@ -5,18 +5,61 @@ Node metadata is loaded from: ../../templates/metadata/nodes/call_pipeline.yaml
from __future__ import annotations
from typing import Any
from typing import Any, Optional
import pydantic
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_event_logger
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
class _NoOpEventLogger(abstract_event_logger.AbstractEventLogger):
"""No-op event logger for workflow pipeline adapter."""
async def info(
self,
text: str,
images: Optional[list[platform_message.Image]] = None,
message_session_id: Optional[str] = None,
no_throw: bool = True,
):
pass
async def debug(
self,
text: str,
images: Optional[list[platform_message.Image]] = None,
message_session_id: Optional[str] = None,
no_throw: bool = True,
):
pass
async def warning(
self,
text: str,
images: Optional[list[platform_message.Image]] = None,
message_session_id: Optional[str] = None,
no_throw: bool = True,
):
pass
async def error(
self,
text: str,
images: Optional[list[platform_message.Image]] = None,
message_session_id: Optional[str] = None,
no_throw: bool = True,
):
pass
@workflow_node('call_pipeline')
class CallPipelineNode(WorkflowNode):
"""Call pipeline node - invoke an existing pipeline"""
@@ -139,10 +182,16 @@ class CallPipelineNode(WorkflowNode):
)
class _WorkflowPipelineCaptureAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""Adapter to capture pipeline responses for workflow execution."""
class Config:
arbitrary_types_allowed = True
responses: list[dict[str, Any]] = []
context: ExecutionContext = pydantic.Field(exclude=True)
def __init__(self, context: ExecutionContext):
super().__init__(config={}, logger=None)
super().__init__(config={}, logger=_NoOpEventLogger())
self.context = context
self.responses = []

View File

@@ -9,7 +9,7 @@ import json
import re
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('code_executor')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
from ..safe_eval import safe_eval_with_vars

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('coze_bot')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('cron_trigger')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
from ..safe_eval import safe_eval_with_vars

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('database_query')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('dify_knowledge_query')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('dify_workflow')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('end')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('event_trigger')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('http_request')

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('iterator')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('knowledge_retrieval')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('langflow_flow')

View File

@@ -10,7 +10,7 @@ from typing import Any, AsyncGenerator
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
logger = logging.getLogger(__name__)
@@ -181,7 +181,6 @@ class LLMCallNode(WorkflowNode):
# Get error handling config
exception_handling = self.get_config('exception_handling', 'show-error')
failure_hint = self.get_config('failure_hint', 'Request failed.')
remove_think = self.get_config('remove_think', False)
track_function_calls = self.get_config('track_function_calls', False)
# Get output format and json_schema config
@@ -192,11 +191,10 @@ class LLMCallNode(WorkflowNode):
enable_tools = self.get_config('enable_tools', False)
tools_config = self.get_config('tools', [])
# Resolve prompts - handle null user_prompt_template
# Resolve prompts
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
# Default to input if not set
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
@@ -229,14 +227,11 @@ class LLMCallNode(WorkflowNode):
if tool_names:
all_tools = await self.ap.tool_mgr.get_tools()
funcs = [t for t in all_tools if t.name in tool_names]
if funcs:
logger.info(f'LLM call node {self.node_id}: using tools: {[t.name for t in funcs]}')
except Exception as e:
logger.warning(f'LLM call node {self.node_id}: failed to load tools - {e}')
logger.warning(f'[LLM:{self.node_id}] Failed to load tools: {e}')
funcs = None
# Invoke LLM with error handling
logger.info(f'LLM call node {self.node_id}: invoking model {model_uuid}')
# Invoke LLM
try:
result_message = await runtime_model.provider.invoke_llm(
query=None,
@@ -246,7 +241,7 @@ class LLMCallNode(WorkflowNode):
extra_args=extra_args,
)
except Exception as e:
logger.warning(f'LLM call node {self.node_id}: request failed - {e}')
logger.warning(f'[LLM:{self.node_id}] LLM call failed: {e}')
# Handle based on exception handling strategy
if exception_handling == 'show-error':
@@ -284,21 +279,20 @@ class LLMCallNode(WorkflowNode):
elif isinstance(elem, str):
response_text += elem
# Remove CoT (Chain of Thought) content if configured
if remove_think:
response_text = self._remove_think_content(response_text)
# Remove CoT content (always remove to avoid leaking internal reasoning)
response_text = self._remove_think_content(response_text)
# Apply content safety filter
response_text, is_blocked, filter_notice = self._apply_content_filter(response_text)
if is_blocked:
logger.warning(f'LLM call node {self.node_id}: response blocked by content filter - {filter_notice}')
logger.warning(f'[LLM:{self.node_id}] Response blocked by content filter: {filter_notice}')
return {
'response': filter_notice,
'usage': usage,
'blocked_by_filter': True,
}
# Extract usage info if available
# Extract usage info
usage = {
'prompt_tokens': 0,
'completion_tokens': 0,
@@ -319,6 +313,7 @@ class LLMCallNode(WorkflowNode):
'total_tokens': getattr(u, 'total_tokens', 0) or 0,
}
# Build result
result: dict[str, Any] = {
'response': response_text,
'usage': usage,
@@ -329,7 +324,7 @@ class LLMCallNode(WorkflowNode):
try:
result['parsed'] = json.loads(response_text)
except json.JSONDecodeError as e:
logger.warning(f'LLM call node {self.node_id}: failed to parse JSON response - {e}')
logger.warning(f'[LLM:{self.node_id}] Failed to parse JSON: {e}')
result['parsed'] = None
result['parse_error'] = str(e)
@@ -354,7 +349,6 @@ class LLMCallNode(WorkflowNode):
if not self.ap:
raise RuntimeError('Application instance not available - cannot call LLM')
remove_think = self.get_config('remove_think', False)
exception_handling = self.get_config('exception_handling', 'show-error')
failure_hint = self.get_config('failure_hint', 'Request failed.')
@@ -383,7 +377,7 @@ class LLMCallNode(WorkflowNode):
if max_tokens and int(max_tokens) > 0:
extra_args['max_tokens'] = int(max_tokens)
logger.info(f'LLM call node {self.node_id}: streaming model {model_uuid}')
logger.info(f'[LLM:{self.node_id}] Streaming model {model_uuid}')
try:
# Try streaming first
@@ -396,6 +390,7 @@ class LLMCallNode(WorkflowNode):
)
full_response = ''
in_think_block = False
async for chunk in stream:
chunk_text = ''
if hasattr(chunk, 'content'):
@@ -409,16 +404,25 @@ class LLMCallNode(WorkflowNode):
chunk_text += elem
if chunk_text:
if remove_think:
chunk_text = self._remove_think_content(chunk_text)
full_response += chunk_text
yield chunk_text
# Filter <think> blocks in streaming mode
if '<think>' in chunk_text or '<thought>' in chunk_text:
in_think_block = True
if in_think_block:
if '</think>' in chunk_text or '</thought>' in chunk_text:
in_think_block = False
chunk_text = chunk_text.split('</think>')[-1].split('</thought>')[-1]
else:
chunk_text = ''
if chunk_text:
full_response += chunk_text
yield chunk_text
# Store in context for downstream nodes
context.variables['_last_llm_response'] = full_response
except Exception as e:
logger.warning(f'LLM call node {self.node_id}: streaming failed, falling back - {e}')
logger.warning(f'[LLM:{self.node_id}] Streaming failed, falling back - {e}')
# Fallback to non-streaming
try:
result_message = await runtime_model.provider.invoke_llm(
@@ -429,12 +433,12 @@ class LLMCallNode(WorkflowNode):
extra_args=extra_args,
)
response_text = self._extract_response_text(result_message)
if remove_think:
response_text = self._remove_think_content(response_text)
# Always remove <think> content in fallback
response_text = self._remove_think_content(response_text)
yield response_text
context.variables['_last_llm_response'] = response_text
except Exception as e2:
logger.error(f'LLM call node {self.node_id}: fallback also failed - {e2}')
logger.error(f'[LLM:{self.node_id}] Fallback also failed - {e2}')
if exception_handling == 'show-hint':
yield failure_hint
elif exception_handling != 'hide':

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('loop')

View File

@@ -11,7 +11,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('mcp_tool')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
class MemoryHelper:

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('merge')

View File

@@ -9,7 +9,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('message_trigger')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('n8n_workflow')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('opening_statement')

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('parallel')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('parameter_extractor')

View File

@@ -7,7 +7,7 @@
# from typing import Any
# from ..entities import ExecutionContext
# from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
# from ..node import WorkflowNode, workflow_node
# @workflow_node('plugin_call')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('question_classifier')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('redis_operation')

View File

@@ -8,7 +8,7 @@ from __future__ import annotations
import logging
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
logger = logging.getLogger(__name__)
@@ -20,13 +20,14 @@ class ReplyMessageNode(WorkflowNode):
category = 'action'
async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]:
message = inputs.get('message')
if message in (None, ''):
message = inputs.get('input')
# Priority: content/response (from upstream nodes) > message (original input) > context
message = inputs.get('content')
if message in (None, ''):
message = inputs.get('response')
if message in (None, ''):
message = inputs.get('content')
message = inputs.get('input')
if message in (None, ''):
message = inputs.get('message')
if message in (None, '') and context.message_context:
message = context.message_context.message_content
if message is None:

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('send_message')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('set_variable')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('store_data')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('switch')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('variable_aggregator')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('wait')

View File

@@ -7,7 +7,7 @@ from __future__ import annotations
from typing import Any
from ..entities import ExecutionContext
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('webhook_trigger')

View File

@@ -0,0 +1,185 @@
"""Variable management for workflow execution.
This module provides utilities for managing workflow variables, including:
- Reserved variable protection
- Variable namespace validation
- Variable inheritance between nodes
"""
from typing import Set, Dict, Any
# 保留变量列表 - 这些变量由系统管理,不能被用户覆盖
RESERVED_VARIABLES = {
'workflow_id',
'execution_id',
'node_id',
'timestamp',
'launcher_type',
'trigger_type',
'message_id',
'execution_status',
}
def get_reserved_variables() -> Set[str]:
"""获取保留变量列表
Returns:
保留变量名称的集合
"""
return RESERVED_VARIABLES.copy()
def validate_variable_namespace(
variables: Dict[str, Any],
namespace: str = 'workflow'
) -> bool:
"""验证变量命名空间
检查变量是否使用了保留的名称。允许不带前缀的变量,但建议使用命名空间前缀。
Args:
variables: 要验证的变量字典
namespace: 命名空间前缀(可选)
Returns:
True 如果验证通过
Raises:
ValueError: 如果变量名称与保留变量冲突
"""
reserved = get_reserved_variables()
for var_name in variables.keys():
if var_name in reserved:
raise ValueError(
f"Variable '{var_name}' is reserved and cannot be used. "
f"Reserved variables: {', '.join(sorted(reserved))}"
)
# 检查命名空间前缀(可选建议)
if namespace and not var_name.startswith(f"{namespace}_"):
# 允许不带前缀的变量,但建议使用前缀
pass
return True
def inherit_variables(
parent_variables: Dict[str, Any],
child_namespace: str
) -> Dict[str, Any]:
"""继承父节点变量到子节点
将父节点的变量继承到子节点,跳过保留变量,并添加命名空间前缀。
Args:
parent_variables: 父节点的变量字典
child_namespace: 子节点的命名空间前缀
Returns:
带有命名空间前缀的继承变量字典
"""
inherited = {}
for key, value in parent_variables.items():
# 跳过保留变量
if key in RESERVED_VARIABLES:
continue
# 添加命名空间前缀
namespaced_key = f"{child_namespace}_{key}"
inherited[namespaced_key] = value
return inherited
def merge_variables(
base_variables: Dict[str, Any],
override_variables: Dict[str, Any],
allow_reserved: bool = False
) -> Dict[str, Any]:
"""合并变量字典
将override_variables合并到base_variables中。
Args:
base_variables: 基础变量字典
override_variables: 要合并的变量字典
allow_reserved: 是否允许覆盖保留变量默认False
Returns:
合并后的变量字典
Raises:
ValueError: 如果尝试覆盖保留变量且allow_reserved=False
"""
result = base_variables.copy()
for key, value in override_variables.items():
if key in RESERVED_VARIABLES and not allow_reserved:
raise ValueError(
f"Cannot override reserved variable '{key}'. "
f"Set allow_reserved=True to override."
)
result[key] = value
return result
def extract_namespace_variables(
variables: Dict[str, Any],
namespace: str
) -> Dict[str, Any]:
"""提取特定命名空间的变量
从变量字典中提取具有特定命名空间前缀的变量。
Args:
variables: 变量字典
namespace: 命名空间前缀
Returns:
提取的变量字典(不包含命名空间前缀)
"""
prefix = f"{namespace}_"
result = {}
for key, value in variables.items():
if key.startswith(prefix):
# 移除命名空间前缀
clean_key = key[len(prefix):]
result[clean_key] = value
return result
def sanitize_variables(
variables: Dict[str, Any],
allowed_keys: Set[str] | None = None
) -> Dict[str, Any]:
"""清理变量字典
移除保留变量和不在允许列表中的变量。
Args:
variables: 要清理的变量字典
allowed_keys: 允许的键集合如果为None则允许所有非保留键
Returns:
清理后的变量字典
"""
result = {}
for key, value in variables.items():
# 跳过保留变量
if key in RESERVED_VARIABLES:
continue
# 检查允许列表
if allowed_keys is not None and key not in allowed_keys:
continue
result[key] = value
return result

50
uv.lock generated
View File

@@ -1973,7 +1973,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
{ name = "langbot-plugin", specifier = "==0.3.11" },
{ name = "langbot-plugin", directory = "/home/typer/Desktop/langbot-plugin-sdk" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-core", specifier = ">=1.2.28" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
@@ -2037,7 +2037,7 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.3.11"
source = { registry = "https://pypi.org/simple" }
source = { directory = "/home/typer/Desktop/langbot-plugin-sdk" }
dependencies = [
{ name = "aiofiles" },
{ name = "dotenv" },
@@ -2054,9 +2054,31 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/91/83/93b86bcdbfe51d820fa59232aaa73cc802d6ce614f67d8f8b33957419538/langbot_plugin-0.3.11.tar.gz", hash = "sha256:8d10c98c771b468b2d35cc007778439c39922a88265fcc16a5881234bc7c1b19", size = 190315, upload-time = "2026-05-12T15:45:24.262Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/22/de7977a6a5cbf557b80043eb3ed39e5feff24033a5d6db4ab88d48ccb6ea/langbot_plugin-0.3.11-py3-none-any.whl", hash = "sha256:c1d2e84eda1584902d99efa316b850c08c1c04fcc199306ff4af1dca1431304a", size = 165574, upload-time = "2026-05-12T15:45:22.908Z" },
[package.metadata]
requires-dist = [
{ name = "aiofiles", specifier = ">=24.1.0" },
{ name = "dotenv", specifier = ">=0.9.9" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "pip", specifier = ">=25.2" },
{ name = "pydantic", specifier = ">=2.11.5" },
{ name = "pydantic-settings", specifier = ">=2.10.1" },
{ name = "pytest", specifier = ">=8.4.0" },
{ name = "pyyaml", specifier = ">=6.0.2" },
{ name = "textual", specifier = ">=3.2.0" },
{ name = "types-aiofiles", specifier = ">=24.1.0.20250516" },
{ name = "types-pyyaml", specifier = ">=6.0.12.20250516" },
{ name = "watchdog", specifier = ">=6.0.0" },
{ name = "websockets", specifier = ">=15.0.1" },
]
[package.metadata.requires-dev]
dev = [
{ name = "mypy", specifier = ">=1.16.0" },
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
{ name = "pytest-cov", specifier = ">=7.0.0" },
{ name = "ruff", specifier = ">=0.11.12" },
]
[[package]]
@@ -2751,7 +2773,7 @@ wheels = [
[[package]]
name = "moto"
version = "5.2.1"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "boto3" },
{ name = "botocore" },
@@ -2761,9 +2783,9 @@ dependencies = [
{ name = "werkzeug" },
{ name = "xmltodict" },
]
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" }
sdist = { url = "https://files.pythonhosted.org/packages/f6/e9/c38202162db2e76623176be9f1dbc9aa41228ffa91ee8da2d3986082c3e3/moto-5.2.1.tar.gz", hash = "sha256:ccb2f3e1dfa82e50e054bda98b0be708d244d2668364dcc1d45e8d3de6091bde", size = 8634437, upload-time = "2026-05-10T19:11:57.286Z" }
wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" },
{ url = "https://files.pythonhosted.org/packages/15/79/8085b7c1ecd48d0535c3c8444a1d8df2926e457dce8e55fabc332a382c9c/moto-5.2.1-py3-none-any.whl", hash = "sha256:19d2fbd6e613aa5b4e364c52cd5d3cea371643a0f4210689a703227bd2924c5c", size = 6671379, upload-time = "2026-05-10T19:11:53.543Z" },
]
[[package]]
@@ -4767,15 +4789,15 @@ wheels = [
[[package]]
name = "responses"
version = "0.26.0"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pyyaml" },
{ name = "requests" },
{ name = "urllib3" },
]
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" }
sdist = { url = "https://files.pythonhosted.org/packages/9f/b4/b7e040379838cc71bf5aabdb26998dfbe5ee73904c92c1c161faf5de8866/responses-0.26.0.tar.gz", hash = "sha256:c7f6923e6343ef3682816ba421c006626777893cb0d5e1434f674b649bac9eb4", size = 81303, upload-time = "2026-02-19T14:38:05.574Z" }
wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" },
{ url = "https://files.pythonhosted.org/packages/ce/04/7f73d05b556da048923e31a0cc878f03be7c5425ed1f268082255c75d872/responses-0.26.0-py3-none-any.whl", hash = "sha256:03ec4409088cd5c66b71ecbbbd27fe2c58ddfad801c66203457b3e6a04868c37", size = 35099, upload-time = "2026-02-19T14:38:03.847Z" },
]
[[package]]
@@ -6072,10 +6094,10 @@ wheels = [
[[package]]
name = "xmltodict"
version = "1.0.4"
source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" }
sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" }
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/70/80f3b7c10d2630aa66414bf23d210386700aa390547278c789afa994fd7e/xmltodict-1.0.4.tar.gz", hash = "sha256:6d94c9f834dd9e44514162799d344d815a3a4faec913717a9ecbfa5be1bb8e61", size = 26124, upload-time = "2026-02-22T02:21:22.074Z" }
wheels = [
{ url = "https://pypi.tuna.tsinghua.edu.cn/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" },
{ url = "https://files.pythonhosted.org/packages/38/34/98a2f52245f4d47be93b580dae5f9861ef58977d73a79eb47c58f1ad1f3a/xmltodict-1.0.4-py3-none-any.whl", hash = "sha256:a4a00d300b0e1c59fc2bfccb53d7b2e88c32f200df138a0dd2229f842497026a", size = 13580, upload-time = "2026-02-22T02:21:21.039Z" },
]
[[package]]