This commit is contained in:
Typer_Body
2026-05-23 02:58:17 +08:00
parent 127198675e
commit 8ebfcd963a
13 changed files with 728 additions and 173 deletions

View File

@@ -120,7 +120,7 @@ class WorkflowDefinition(pydantic.BaseModel):
uuid: str
name: str
description: str = ''
emoji: str = '🔄'
emoji: str = '💼'
version: int = 1
# Workflow graph

View File

@@ -30,6 +30,7 @@ from . import variable_aggregator
from . import send_message
from . import reply_message
from . import call_pipeline
from . import call_workflow
from . import store_data
from . import set_variable
from . import opening_statement
@@ -74,6 +75,7 @@ __all__ = [
'send_message',
'reply_message',
'call_pipeline',
'call_workflow',
'store_data',
'set_variable',
'opening_statement',

View File

@@ -0,0 +1,85 @@
"""Call Workflow Node - invoke an existing workflow
Node metadata is loaded from: ../../templates/metadata/nodes/call_workflow.yaml
"""
from __future__ import annotations
from typing import Any, Optional
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@workflow_node('call_workflow')
class CallWorkflowNode(WorkflowNode):
"""Call workflow node - invoke an existing workflow"""
category = 'action'
async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]:
if not self.ap:
raise RuntimeError('Application instance not available — cannot call workflow')
# Get workflow reference from config
workflow_ref = str(self.get_config('workflow_uuid', '') or '').strip()
if not workflow_ref:
raise ValueError('No workflow configured for call workflow node')
# Get workflow definition from service
workflow_data = await self.ap.workflow_service.get_workflow(workflow_ref)
if workflow_data is None:
raise ValueError(f'Workflow not found: {workflow_ref}')
workflow_uuid = str(workflow_data.get('uuid', '') or '')
if not workflow_uuid:
raise ValueError(f'Workflow UUID missing for: {workflow_ref}')
# Build variables to pass to the called workflow
variables = dict(inputs.get('variables', {}) or {})
# Inherit current workflow variables if configured
if self.get_config('inherit_variables', True):
for key, value in (context.variables or {}).items():
if key not in variables:
variables[key] = value
# Add context markers for debugging
variables['_called_from_workflow'] = True
variables['_parent_workflow_id'] = context.workflow_id
variables['_parent_execution_id'] = context.execution_id
# Execute the workflow
execution_id = await self.ap.workflow_service.execute_workflow(
workflow_uuid=workflow_uuid,
trigger_type='workflow_call',
trigger_data={
'variables': variables,
'parent_execution_id': context.execution_id,
},
session_id=context.session_id,
user_id=context.user_id,
bot_id=context.bot_id,
)
# Get execution result
execution = await self.ap.workflow_service.get_execution(execution_id)
if execution is None:
raise ValueError(f'Execution result not found: {execution_id}')
# Build result
result = {
'workflow_uuid': workflow_uuid,
'workflow_name': workflow_data.get('name', ''),
'execution_id': execution_id,
'status': execution.get('status', 'unknown'),
'variables': execution.get('variables', {}),
'error': execution.get('error'),
}
return {
'result': result,
'status': execution.get('status', 'unknown'),
'error': execution.get('error'),
}

View File

@@ -7,7 +7,6 @@ from __future__ import annotations
import ipaddress
import logging
import re
from typing import Any
from urllib.parse import urlparse

View File

@@ -1,4 +1,11 @@
"""LLM Call Node - invoke large language model."""
"""LLM Call Node - invoke large language model with Agent capabilities.
Supports:
- Primary model with fallback models
- Knowledge base retrieval with reranking
- Max round context control
- Streaming output
"""
from __future__ import annotations
@@ -8,7 +15,7 @@ import re
from typing import Any, AsyncGenerator
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.rag.context as rag_context
from langbot_plugin.api.entities.builtin.workflow import ExecutionContext
from ..node import WorkflowNode, workflow_node
@@ -133,29 +140,20 @@ class LLMCallNode(WorkflowNode):
return text, False, ''
def _parse_tools_config(self, tools_config: Any) -> list[dict]:
"""Parse tools configuration from YAML config format."""
if not tools_config:
return []
# RAG combined prompt template (same as localagent.py)
RAG_COMBINED_PROMPT_TEMPLATE = """
The following are relevant context entries retrieved from the knowledge base.
Please use them to answer the user's message.
Respond in the same language as the user's input.
# If it's already a list, return as-is
if isinstance(tools_config, list):
return tools_config
<context>
{rag_context}
</context>
# If it's a string, try to parse as JSON
if isinstance(tools_config, str):
tools_config = tools_config.strip()
if not tools_config:
return []
try:
parsed = json.loads(tools_config)
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
logger.warning(f'Failed to parse tools config as JSON: {tools_config}')
return []
return []
<user_message>
{user_message}
</user_message>
"""
def _build_system_prompt_with_format(self, base_prompt: str, output_format: str, json_schema: str) -> str:
"""Build system prompt with output format instructions."""
@@ -170,6 +168,220 @@ class LLMCallNode(WorkflowNode):
return prompt
def _build_messages_from_prompt_array(
self,
prompt_array: list[dict],
inputs: dict[str, Any],
context: ExecutionContext,
output_format: str,
json_schema: str,
) -> list[provider_message.Message]:
"""Build messages list from prompt array (same format as pipeline).
Each item in prompt_array is {role: str, content: str}.
Resolves template variables in content.
"""
messages: list[provider_message.Message] = []
for item in prompt_array:
role = item.get('role', 'user')
content = item.get('content', '')
# Resolve template variables in content
resolved_content = self._resolve_template(content, inputs, context)
# Apply format instructions to system prompt
if role == 'system':
resolved_content = self._build_system_prompt_with_format(
resolved_content, output_format, json_schema
)
messages.append(provider_message.Message(role=role, content=resolved_content))
return messages
async def _get_model_candidates(self, model_uuid: str, fallback_models: list) -> list:
"""Build ordered list of models to try: primary model + fallback models."""
candidates = []
# Primary model
if model_uuid:
try:
primary = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
candidates.append(primary)
except ValueError:
logger.warning(f'[LLM:{self.node_id}] Primary model {model_uuid} not found')
# Fallback models
for fb_uuid in fallback_models:
try:
fb_model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
candidates.append(fb_model)
except ValueError:
logger.warning(f'[LLM:{self.node_id}] Fallback model {fb_uuid} not found, skipping')
return candidates
async def _invoke_with_fallback(
self,
candidates: list,
messages: list,
funcs: list | None,
extra_args: dict,
) -> tuple[Any, Any]:
"""Try non-streaming invocation with sequential fallback. Returns (message, model_used)."""
last_error = None
for model in candidates:
try:
msg = await model.provider.invoke_llm(
query=None,
model=model,
messages=messages,
funcs=funcs if model.model_entity.abilities.__contains__('func_call') else [],
extra_args=extra_args,
)
return msg, model
except Exception as e:
last_error = e
logger.warning(f'[LLM:{self.node_id}] Model {model.model_entity.name} failed: {e}, trying next...')
raise last_error or RuntimeError('No model candidates available')
async def _retrieve_knowledge(
self,
user_message_text: str,
knowledge_bases: list[str],
rerank_model_uuid: str,
rerank_top_k: int,
) -> str:
"""Retrieve from knowledge bases and optionally rerank results.
Returns the enhanced user message text with RAG context, or original text if no results.
"""
if not knowledge_bases or not user_message_text:
return user_message_text
all_results: list[rag_context.RetrievalResultEntry] = []
# Retrieve from each knowledge base
for kb_uuid in knowledge_bases:
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if not kb:
logger.warning(f'[LLM:{self.node_id}] Knowledge base {kb_uuid} not found, skipping')
continue
result = await kb.retrieve(user_message_text, settings={})
if result:
all_results.extend(result)
except Exception as e:
logger.warning(f'[LLM:{self.node_id}] Failed to retrieve from KB {kb_uuid}: {e}')
# Rerank step: re-score results using a rerank model if configured
if all_results and rerank_model_uuid:
try:
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
doc_texts = []
for entry in all_results:
text = ' '.join(c.text for c in entry.content if c.type == 'text' and c.text)
doc_texts.append(text)
doc_texts_capped = doc_texts[:64] # Cap for reranker input
scores = await rerank_model.provider.invoke_rerank(
model=rerank_model,
query=user_message_text,
documents=doc_texts_capped,
)
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
top_indices = [s['index'] for s in scored[:rerank_top_k] if s['index'] < len(all_results)]
all_results = [all_results[i] for i in top_indices]
logger.info(
f'[LLM:{self.node_id}] Rerank complete: {len(doc_texts)} docs -> top {len(all_results)} kept (top_k={rerank_top_k})'
)
except ValueError:
logger.warning(f'[LLM:{self.node_id}] Rerank model {rerank_model_uuid} not found, skipping rerank')
except Exception as e:
logger.warning(f'[LLM:{self.node_id}] Rerank failed, using original order: {e}')
# Build RAG context text
if all_results:
texts = []
idx = 1
for entry in all_results:
for content in entry.content:
if content.type == 'text' and content.text is not None:
texts.append(f'[{idx}] {content.text}')
idx += 1
rag_context_text = '\n\n'.join(texts)
return self.RAG_COMBINED_PROMPT_TEMPLATE.format(
rag_context=rag_context_text,
user_message=user_message_text,
)
return user_message_text
def _build_messages_with_history(
self,
system_prompt: str,
user_message_text: str,
context: ExecutionContext,
max_round: int,
) -> list[provider_message.Message]:
"""Build messages list with conversation history up to max_round."""
messages: list[provider_message.Message] = []
# Add system prompt
if system_prompt:
messages.append(provider_message.Message(role='system', content=system_prompt))
# Get conversation history from context
conversation_history = context.variables.get('_conversation_history', [])
# Apply max_round limit (each round = 1 user + 1 assistant message)
if max_round > 0 and conversation_history:
# Keep only the last max_round * 2 messages (user + assistant pairs)
max_messages = max_round * 2
if len(conversation_history) > max_messages:
conversation_history = conversation_history[-max_messages:]
# Add conversation history
for msg in conversation_history:
if isinstance(msg, dict):
role = msg.get('role', 'user')
content = msg.get('content', '')
messages.append(provider_message.Message(role=role, content=content))
elif hasattr(msg, 'role') and hasattr(msg, 'content'):
messages.append(provider_message.Message(role=msg.role, content=msg.content))
# Add current user message
messages.append(provider_message.Message(role='user', content=user_message_text))
return messages
def _save_to_conversation_history(
self,
context: ExecutionContext,
user_message_text: str,
response_text: str,
max_round: int,
) -> None:
"""Save the exchange to conversation history."""
if max_round <= 0:
return
history = context.variables.get('_conversation_history', [])
history.append({'role': 'user', 'content': user_message_text})
history.append({'role': 'assistant', 'content': response_text})
# Enforce max_round limit
max_messages = max_round * 2
if len(history) > max_messages:
history = history[-max_messages:]
context.variables['_conversation_history'] = history
async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]:
model_uuid = self.get_config('model', '')
if not model_uuid:
@@ -187,28 +399,90 @@ class LLMCallNode(WorkflowNode):
output_format = self.get_config('output_format', 'text')
json_schema = self.get_config('json_schema', '')
# Get tools config
enable_tools = self.get_config('enable_tools', False)
tools_config = self.get_config('tools', [])
# Agent config: fallback models, knowledge bases, rerank, max_round
fallback_models = self.get_config('fallback_models', [])
knowledge_bases = self.get_config('knowledge_bases', [])
rerank_model = self.get_config('rerank_model', '')
rerank_top_k = self.get_config('rerank_top_k', 5)
max_round = self.get_config('max_round', 10)
# Resolve prompts
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
# Resolve prompts - support both new prompt array format and legacy format
prompt_array = self.get_config('prompt')
user_prompt = '' # Initialize for later use in _save_to_conversation_history
if prompt_array and isinstance(prompt_array, list):
# New format: prompt array like pipeline
messages = self._build_messages_from_prompt_array(
prompt_array, inputs, context, output_format, json_schema
)
# Get user input text for knowledge retrieval
user_input = inputs.get('input', '')
# Knowledge retrieval: enhance user input with RAG context
user_input = await self._retrieve_knowledge(
user_message_text=user_input,
knowledge_bases=knowledge_bases,
rerank_model_uuid=rerank_model,
rerank_top_k=rerank_top_k,
)
# Track user_prompt for conversation history
user_prompt = user_input
# Add user input as last message
if user_input:
messages.append(provider_message.Message(role='user', content=user_input))
# Apply max_round to conversation history
conversation_history = context.variables.get('_conversation_history', [])
if max_round > 0 and conversation_history:
max_messages = max_round * 2
if len(conversation_history) > max_messages:
conversation_history = conversation_history[-max_messages:]
# Insert conversation history before user input
history_messages = []
for msg in conversation_history:
if isinstance(msg, dict):
role = msg.get('role', 'user')
content = msg.get('content', '')
history_messages.append(provider_message.Message(role=role, content=content))
elif hasattr(msg, 'role') and hasattr(msg, 'content'):
history_messages.append(provider_message.Message(role=msg.role, content=msg.content))
# Insert history before user message
if history_messages and len(messages) > 0:
messages = messages[:-1] + history_messages + [messages[-1]]
else:
# Legacy format: separate system_prompt and user_prompt_template
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
# Build system prompt with format instructions
system_prompt = self._build_system_prompt_with_format(system_prompt, output_format, json_schema)
# Build system prompt with format instructions
system_prompt = self._build_system_prompt_with_format(system_prompt, output_format, json_schema)
# Build messages
messages: list[provider_message.Message] = []
if system_prompt:
messages.append(provider_message.Message(role='system', content=system_prompt))
messages.append(provider_message.Message(role='user', content=user_prompt))
# Knowledge retrieval: enhance user prompt with RAG context
user_prompt = await self._retrieve_knowledge(
user_message_text=user_prompt,
knowledge_bases=knowledge_bases,
rerank_model_uuid=rerank_model,
rerank_top_k=rerank_top_k,
)
# Get model
runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
# Build messages with conversation history
messages = self._build_messages_with_history(
system_prompt=system_prompt,
user_message_text=user_prompt,
context=context,
max_round=max_round,
)
# Get model candidates (primary + fallbacks)
candidates = await self._get_model_candidates(model_uuid, fallback_models)
if not candidates:
raise ValueError('No valid model candidates available')
# Build extra args from config
extra_args: dict[str, Any] = {}
@@ -219,25 +493,12 @@ class LLMCallNode(WorkflowNode):
if max_tokens and int(max_tokens) > 0:
extra_args['max_tokens'] = int(max_tokens)
# Build tools list if enabled
funcs: list[resource_tool.LLMTool] | None = None
if enable_tools and tools_config:
try:
tool_names = self._parse_tools_config(tools_config)
if tool_names:
all_tools = await self.ap.tool_mgr.get_tools()
funcs = [t for t in all_tools if t.name in tool_names]
except Exception as e:
logger.warning(f'[LLM:{self.node_id}] Failed to load tools: {e}')
funcs = None
# Invoke LLM
# Invoke LLM with fallback
try:
result_message = await runtime_model.provider.invoke_llm(
query=None,
model=runtime_model,
result_message, used_model = await self._invoke_with_fallback(
candidates=candidates,
messages=messages,
funcs=funcs,
funcs=None,
extra_args=extra_args,
)
except Exception as e:
@@ -300,11 +561,6 @@ class LLMCallNode(WorkflowNode):
}
# Extract usage info
usage = {
'prompt_tokens': 0,
'completion_tokens': 0,
'total_tokens': 0,
}
if hasattr(result_message, 'usage') and result_message.usage:
u = result_message.usage
usage = {
@@ -320,10 +576,20 @@ class LLMCallNode(WorkflowNode):
'total_tokens': getattr(u, 'total_tokens', 0) or 0,
}
# Save to conversation history
self._save_to_conversation_history(
context=context,
user_message_text=user_prompt,
response_text=response_text,
max_round=max_round,
)
# Build result
result: dict[str, Any] = {
'response': response_text,
'usage': usage,
'model_used': used_model.model_entity.name if used_model else None,
'model_uuid': used_model.model_entity.uuid if used_model else None,
}
# Parse JSON output if format is json
@@ -359,18 +625,31 @@ class LLMCallNode(WorkflowNode):
exception_handling = self.get_config('exception_handling', 'show-error')
failure_hint = self.get_config('failure_hint', 'Request failed.')
# Resolve prompts
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
# Resolve prompts - support both new prompt array format and legacy format
prompt_array = self.get_config('prompt')
if prompt_array and isinstance(prompt_array, list):
# New format: prompt array like pipeline
messages = self._build_messages_from_prompt_array(
prompt_array, inputs, context, 'text', '' # No format instructions for streaming
)
# Add user input
user_input = inputs.get('input', '')
if user_input:
messages.append(provider_message.Message(role='user', content=user_input))
else:
# Legacy format
system_prompt = self._resolve_template(self.get_config('system_prompt') or '', inputs, context)
user_prompt_template = self.get_config('user_prompt_template')
if user_prompt_template is None:
user_prompt_template = '{{input}}'
user_prompt = self._resolve_template(user_prompt_template, inputs, context)
# Build messages
messages: list[provider_message.Message] = []
if system_prompt:
messages.append(provider_message.Message(role='system', content=system_prompt))
messages.append(provider_message.Message(role='user', content=user_prompt))
# Build messages
messages = []
if system_prompt:
messages.append(provider_message.Message(role='system', content=system_prompt))
messages.append(provider_message.Message(role='user', content=user_prompt))
# Get model
runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)