This commit is contained in:
Typer_Body
2026-06-04 03:13:32 +08:00
parent 192b69b0fb
commit 6609bebeec
10 changed files with 223 additions and 45 deletions

View File

@@ -135,7 +135,8 @@ class BotService:
binding_type = bot_data.get('binding_type')
# set use_pipeline_name (for backward compatibility with 'pipeline' binding_type)
if 'use_pipeline_uuid' in bot_data:
# Only validate pipeline when binding_type is 'pipeline' or not set (default to pipeline)
if 'use_pipeline_uuid' in bot_data and binding_type != 'workflow':
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_pipeline.LegacyPipeline).where(
persistence_pipeline.LegacyPipeline.uuid == bot_data['use_pipeline_uuid']
@@ -149,7 +150,12 @@ class BotService:
bot_data['binding_uuid'] = bot_data['use_pipeline_uuid']
bot_data['binding_type'] = 'pipeline'
else:
raise Exception('Pipeline not found')
# Only raise error if binding_type is explicitly 'pipeline' or not set
if binding_type is None or binding_type == 'pipeline':
raise Exception('Pipeline not found')
# If binding_type is 'workflow', just clear the use_pipeline_uuid
bot_data['use_pipeline_uuid'] = None
bot_data['use_pipeline_name'] = None
# If binding_uuid is set directly (for workflow), sync use_pipeline_uuid for backward compatibility
if 'binding_uuid' in bot_data and binding_type == 'workflow':

View File

@@ -6,9 +6,7 @@ import asyncio
import logging
import uuid
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
from typing import Optional, TYPE_CHECKING
import sqlalchemy
@@ -22,10 +20,13 @@ from ....workflow.entities import (
Position,
MessageContext,
)
from langbot_plugin.api.entities.builtin.workflow.enums import ExecutionStatus, NodeStatus
from langbot_plugin.api.entities.builtin.workflow.enums import ExecutionStatus, NodeStatus, TriggerType
from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery
from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes
from ....workflow.executor import WorkflowExecutor
from ....workflow.registry import NodeTypeRegistry
logger = logging.getLogger(__name__)
class WorkflowExecutionFailedError(Exception):
"""Raised when a workflow execution finishes with failed status."""
@@ -399,8 +400,26 @@ class WorkflowService:
),
raw_message=message_context_data.get('raw_message', {}),
)
# Set query from message_content for logging purposes
context.query = context.message_context.message_content
# Determine launcher_type from is_group flag
is_group = message_context_data.get('is_group', False)
launcher_type = LauncherTypes.GROUP if is_group else LauncherTypes.PERSON
# Create WorkflowQuery object with launcher_type for monitoring
context.query = WorkflowQuery(
workflow_uuid=workflow_uuid,
workflow_name=workflow_name,
execution_id=execution_uuid,
launcher_type=launcher_type,
launcher_id=message_context_data.get('sender_id', ''),
sender_id=message_context_data.get('sender_id', ''),
sender_name=message_context_data.get('sender_name', 'User'),
message_context=context.message_context,
bot_uuid=bot_id,
trigger_type=TriggerType.MESSAGE,
trigger_data=raw_trigger_data,
variables={},
)
# Note: Frontend panel logging has been removed.
# A new solution will be implemented separately.
@@ -413,7 +432,9 @@ class WorkflowService:
# Store launcher info for monitoring (used when query is a string)
if message_context_data:
context.variables['_launcher_type'] = 'websocket'
# Determine launcher_type from is_group flag
is_group = message_context_data.get('is_group', False)
context.variables['_launcher_type'] = 'group' if is_group else 'person'
context.variables['_launcher_id'] = message_context_data.get('sender_id', '')
context.variables['_sender_name'] = message_context_data.get('sender_name', 'User')

View File

@@ -2,11 +2,14 @@ from __future__ import annotations
import asyncio
import json
import logging
import traceback
import sqlalchemy
from ..core import app, entities as core_entities, taskmgr
logger = logging.getLogger(__name__)
from ..discover import engine
from ..entity.persistence import bot as persistence_bot
@@ -99,6 +102,74 @@ class RuntimeBot:
return binding_uuid, False
async def _handle_workflow_message(
self,
event: platform_events.MessageEvent,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
workflow_uuid: str,
launcher_type: str,
launcher_id: str | int,
sender_id: str | int,
) -> None:
"""Handle message by executing the bound workflow directly."""
message_content = str(event.message_chain)
message_chain_obj = event.message_chain
# Build message context
sender_name = None
if hasattr(event, 'sender'):
sender = event.sender
if hasattr(sender, 'nickname'):
sender_name = sender.nickname
elif hasattr(sender, 'member_name'):
sender_name = sender.member_name
is_group = launcher_type == 'group'
message_context = {
'message_id': str(getattr(event, 'message_id', '')),
'message_content': message_content,
'sender_id': str(sender_id),
'sender_name': sender_name or 'User',
'platform': adapter.__class__.__name__,
'conversation_id': str(launcher_id),
'is_group': is_group,
'group_id': str(launcher_id) if is_group else None,
'mentions': [],
'reply_to': None,
'raw_message': {
'message': message_chain_obj.model_dump() if hasattr(message_chain_obj, 'model_dump') else str(message_chain_obj),
'launcher_id': launcher_id,
'session_type': launcher_type,
},
}
trigger_data = {
'message': message_content,
'message_chain': message_chain_obj.model_dump() if hasattr(message_chain_obj, 'model_dump') else str(message_chain_obj),
'session_type': launcher_type,
'connection_id': str(launcher_id),
'message_context': message_context,
}
session_id = f'{launcher_type}_{launcher_id}'
logger.info(f'Processing workflow message from {session_id}: {message_content}')
try:
from ..api.http.service.workflow import WorkflowExecutionFailedError
execution_id = await self.ap.workflow_service.execute_workflow(
workflow_uuid=workflow_uuid,
trigger_type='message',
trigger_data=trigger_data,
session_id=session_id,
user_id=str(sender_id),
bot_id=self.bot_entity.uuid,
)
except WorkflowExecutionFailedError as e:
await self.logger.error(f'Workflow execution failed: {e.message}')
except Exception as e:
await self.logger.error(f'Workflow execution error: {e}')
async def _record_discarded_message(
self,
launcher_type: provider_session.LauncherTypes,
@@ -193,6 +264,20 @@ class RuntimeBot:
message_text = str(event.message_chain)
element_types = [comp.type for comp in event.message_chain]
binding_type, binding_uuid = self.get_binding_info()
# Handle workflow binding separately from pipeline
if binding_type == 'workflow':
await self._handle_workflow_message(
event=event,
adapter=adapter,
workflow_uuid=binding_uuid,
launcher_type='person',
launcher_id=launcher_id,
sender_id=event.sender.id,
)
return
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid(
'person', launcher_id, message_text, element_types
)
@@ -254,6 +339,20 @@ class RuntimeBot:
message_text = str(event.message_chain)
element_types = [comp.type for comp in event.message_chain]
binding_type, binding_uuid = self.get_binding_info()
# Handle workflow binding separately from pipeline
if binding_type == 'workflow':
await self._handle_workflow_message(
event=event,
adapter=adapter,
workflow_uuid=binding_uuid,
launcher_type='group',
launcher_id=launcher_id,
sender_id=event.sender.id,
)
return
pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid(
'group', launcher_id, message_text, element_types
)

View File

@@ -354,9 +354,19 @@ class RuntimeConnectionHandler(handler.Handler):
extra_args=extra_args,
)
# invoke_llm returns (message, usage_info) tuple
if isinstance(result, tuple) and len(result) == 2:
msg, usage_info = result
msg_dump = msg.model_dump()
# Attach usage info to message dump
if usage_info:
msg_dump['usage'] = usage_info
else:
msg_dump = result.model_dump()
return handler.ActionResponse.success(
data={
'message': result.model_dump(),
'message': msg_dump,
},
)

View File

@@ -69,6 +69,11 @@ class RuntimeProvider:
if usage_info:
input_tokens = usage_info.get('input_tokens', 0)
output_tokens = usage_info.get('output_tokens', 0)
# Attach usage info to message using object.__setattr__ to bypass pydantic validation
try:
object.__setattr__(msg, 'usage', usage_info)
except (AttributeError, TypeError):
pass # If we can't set it, just skip it
return msg
else:
return result

View File

@@ -11,19 +11,31 @@ from typing import Any, Optional
import pydantic
# Import SDK entities for standard workflow protocol types
# These are re-exported for use by other modules in the workflow package.
from langbot_plugin.api.entities.builtin.workflow.entities import (
ExecutionContext,
ExecutionStep,
MessageContext,
ExecutionContext as ExecutionContext,
ExecutionStep as ExecutionStep,
MessageContext as MessageContext,
NodeDefinition,
NodeState,
PortDefinition,
NodeState as NodeState,
PortDefinition as PortDefinition,
)
from langbot_plugin.api.entities.builtin.workflow.enums import (
ExecutionStatus,
NodeStatus,
ExecutionStatus as ExecutionStatus,
NodeStatus as NodeStatus,
)
__all__ = [
"ExecutionContext",
"ExecutionStep",
"MessageContext",
"NodeDefinition",
"NodeState",
"PortDefinition",
"ExecutionStatus",
"NodeStatus",
]
class Position(pydantic.BaseModel):
"""Node position on canvas"""

View File

@@ -32,7 +32,6 @@ from .entities import (
)
from ..entity.persistence import workflow as persistence_workflow
from .registry import NodeTypeRegistry
from . import monitor
if TYPE_CHECKING:
from ..core import app

View File

@@ -20,17 +20,24 @@ import json
if typing.TYPE_CHECKING:
from ..core import app
from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery
class WorkflowMonitoringHelper:
"""Helper class for workflow monitoring operations"""
@staticmethod
def _is_workflow_query(query) -> bool:
"""Check if query is a WorkflowQuery object"""
if query is None or isinstance(query, str):
return False
# Check for WorkflowQuery attributes
return hasattr(query, 'launcher_type') or hasattr(query, 'workflow_uuid')
@staticmethod
def _get_session_id(query, context_vars: dict | None = None) -> str:
"""Build session_id from query or context_vars"""
# Try to get from query first
if not isinstance(query, str) and query.launcher_type:
# Try to get from WorkflowQuery first
if WorkflowMonitoringHelper._is_workflow_query(query) and query.launcher_type:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
launcher_id = query.launcher_id or 'unknown'
return f'{launcher_type}_{launcher_id}'
@@ -44,17 +51,23 @@ class WorkflowMonitoringHelper:
@staticmethod
def _get_platform(query, context_vars: dict | None = None) -> str:
"""Get platform name from query or context_vars"""
if not isinstance(query, str) and query.launcher_type:
# Try WorkflowQuery first
if WorkflowMonitoringHelper._is_workflow_query(query) and query.launcher_type:
if hasattr(query.launcher_type, 'value'):
return query.launcher_type.value
return str(query.launcher_type)
# Fallback to context_vars for launcher_type (person/group)
if context_vars and context_vars.get('_launcher_type'):
return context_vars['_launcher_type']
return 'workflow'
@staticmethod
def _get_sender_name(query, context_vars: dict | None = None) -> str | None:
"""Get sender name from query or context_vars"""
# Try query first
if not isinstance(query, str):
# Try WorkflowQuery first
if WorkflowMonitoringHelper._is_workflow_query(query):
if query.sender_name:
return query.sender_name
if query.message_event and hasattr(query.message_event, 'sender'):

View File

@@ -234,19 +234,25 @@ Respond in the same language as the user's input.
messages: list,
funcs: list | None,
extra_args: dict,
) -> tuple[Any, Any]:
"""Try non-streaming invocation with sequential fallback. Returns (message, model_used)."""
) -> tuple[Any, Any, dict]:
"""Try non-streaming invocation with sequential fallback. Returns (message, model_used, usage_info)."""
last_error = None
for model in candidates:
try:
msg = await model.provider.invoke_llm(
result = await model.provider.invoke_llm(
query=None,
model=model,
messages=messages,
funcs=funcs if model.model_entity.abilities.__contains__('func_call') else [],
extra_args=extra_args,
)
return msg, model
# invoke_llm returns (message, usage_info) tuple
if isinstance(result, tuple) and len(result) == 2:
msg, usage_info = result
else:
msg = result
usage_info = {}
return msg, model, usage_info
except Exception as e:
last_error = e
logger.warning(f'[LLM:{self.node_id}] Model {model.model_entity.name} failed: {e}, trying next...')
@@ -514,7 +520,7 @@ Respond in the same language as the user's input.
# Invoke LLM with fallback
try:
result_message, used_model = await self._invoke_with_fallback(
result_message, used_model, llm_usage = await self._invoke_with_fallback(
candidates=candidates,
messages=messages,
funcs=None,
@@ -579,25 +585,31 @@ Respond in the same language as the user's input.
'blocked_by_filter': True,
}
# Extract usage info
if hasattr(result_message, 'usage') and result_message.usage:
# Extract usage info from LLM call result
# Priority: llm_usage (from _invoke_with_fallback) > result_message.usage > result_message.token_usage
if llm_usage:
usage = {
'prompt_tokens': llm_usage.get('input_tokens', 0) or llm_usage.get('prompt_tokens', 0),
'completion_tokens': llm_usage.get('output_tokens', 0) or llm_usage.get('completion_tokens', 0),
'total_tokens': llm_usage.get('total_tokens', 0),
}
# Check result_message.usage (set by RuntimeProvider.invoke_llm)
elif hasattr(result_message, 'usage') and result_message.usage:
u = result_message.usage
# Handle both object and dict usage
if isinstance(u, dict):
usage = {
'prompt_tokens': u.get('prompt_tokens', 0) or 0,
'completion_tokens': u.get('completion_tokens', 0) or 0,
'total_tokens': u.get('total_tokens', 0) or 0,
'prompt_tokens': u.get('input_tokens', 0) or u.get('prompt_tokens', 0),
'completion_tokens': u.get('output_tokens', 0) or u.get('completion_tokens', 0),
'total_tokens': u.get('total_tokens', 0),
}
else:
usage = {
'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0,
'completion_tokens': getattr(u, 'completion_tokens', 0) or 0,
'total_tokens': getattr(u, 'total_tokens', 0) or 0,
'prompt_tokens': getattr(u, 'input_tokens', 0) or getattr(u, 'prompt_tokens', 0),
'completion_tokens': getattr(u, 'output_tokens', 0) or getattr(u, 'completion_tokens', 0),
'total_tokens': getattr(u, 'total_tokens', 0),
}
elif hasattr(result_message, 'token_usage') and result_message.token_usage:
u = result_message.token_usage
# Handle both object and dict token_usage
if isinstance(u, dict):
usage = {
'prompt_tokens': u.get('prompt_tokens', 0) or 0,

View File

@@ -320,6 +320,7 @@ export default function BotForm({
setIsLoading(true);
if (initBotId) {
const formValues = form.getValues();
const bindingType = formValues.binding_type ?? 'pipeline';
const updateBot: Bot = {
uuid: initBotId,
name: formValues.name,
@@ -327,13 +328,13 @@ export default function BotForm({
adapter: formValues.adapter,
adapter_config: formValues.adapter_config,
enable: formValues.enable,
binding_type: formValues.binding_type ?? 'pipeline',
binding_type: bindingType,
binding_uuid: formValues.binding_uuid ?? '',
// Sync use_pipeline_uuid for backward compatibility when binding_type is 'pipeline'
use_pipeline_uuid:
formValues.binding_type === 'pipeline'
? formValues.binding_uuid
: formValues.use_pipeline_uuid,
// Only send use_pipeline_uuid when binding_type is 'pipeline'
// For 'workflow' binding, we don't need this field
...(bindingType === 'pipeline' && {
use_pipeline_uuid: formValues.binding_uuid,
}),
};
httpClient
.updateBot(initBotId, updateBot)