From 6609bebeecb021e0f005b88699f00047011a748c Mon Sep 17 00:00:00 2001 From: Typer_Body Date: Thu, 4 Jun 2026 03:13:32 +0800 Subject: [PATCH] platfrom --- src/langbot/pkg/api/http/service/bot.py | 10 +- src/langbot/pkg/api/http/service/workflow.py | 35 +++++-- src/langbot/pkg/platform/botmgr.py | 99 +++++++++++++++++++ src/langbot/pkg/plugin/handler.py | 12 ++- .../pkg/provider/modelmgr/requester.py | 5 + src/langbot/pkg/workflow/entities.py | 26 +++-- src/langbot/pkg/workflow/executor.py | 1 - src/langbot/pkg/workflow/monitoring_helper.py | 25 +++-- src/langbot/pkg/workflow/nodes/llm_call.py | 42 +++++--- .../home/bots/components/bot-form/BotForm.tsx | 13 +-- 10 files changed, 223 insertions(+), 45 deletions(-) diff --git a/src/langbot/pkg/api/http/service/bot.py b/src/langbot/pkg/api/http/service/bot.py index 06898dac..f75dcfe3 100644 --- a/src/langbot/pkg/api/http/service/bot.py +++ b/src/langbot/pkg/api/http/service/bot.py @@ -135,7 +135,8 @@ class BotService: binding_type = bot_data.get('binding_type') # set use_pipeline_name (for backward compatibility with 'pipeline' binding_type) - if 'use_pipeline_uuid' in bot_data: + # Only validate pipeline when binding_type is 'pipeline' or not set (default to pipeline) + if 'use_pipeline_uuid' in bot_data and binding_type != 'workflow': result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_pipeline.LegacyPipeline).where( persistence_pipeline.LegacyPipeline.uuid == bot_data['use_pipeline_uuid'] @@ -149,7 +150,12 @@ class BotService: bot_data['binding_uuid'] = bot_data['use_pipeline_uuid'] bot_data['binding_type'] = 'pipeline' else: - raise Exception('Pipeline not found') + # Only raise error if binding_type is explicitly 'pipeline' or not set + if binding_type is None or binding_type == 'pipeline': + raise Exception('Pipeline not found') + # If binding_type is 'workflow', just clear the use_pipeline_uuid + bot_data['use_pipeline_uuid'] = None + bot_data['use_pipeline_name'] = None # If binding_uuid is set directly (for workflow), sync use_pipeline_uuid for backward compatibility if 'binding_uuid' in bot_data and binding_type == 'workflow': diff --git a/src/langbot/pkg/api/http/service/workflow.py b/src/langbot/pkg/api/http/service/workflow.py index 7ab1f8c2..2914a0d8 100644 --- a/src/langbot/pkg/api/http/service/workflow.py +++ b/src/langbot/pkg/api/http/service/workflow.py @@ -6,9 +6,7 @@ import asyncio import logging import uuid from datetime import datetime, timedelta -from typing import Optional - -logger = logging.getLogger(__name__) +from typing import Optional, TYPE_CHECKING import sqlalchemy @@ -22,10 +20,13 @@ from ....workflow.entities import ( Position, MessageContext, ) -from langbot_plugin.api.entities.builtin.workflow.enums import ExecutionStatus, NodeStatus +from langbot_plugin.api.entities.builtin.workflow.enums import ExecutionStatus, NodeStatus, TriggerType +from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery +from langbot_plugin.api.entities.builtin.provider.session import LauncherTypes from ....workflow.executor import WorkflowExecutor from ....workflow.registry import NodeTypeRegistry +logger = logging.getLogger(__name__) class WorkflowExecutionFailedError(Exception): """Raised when a workflow execution finishes with failed status.""" @@ -399,8 +400,26 @@ class WorkflowService: ), raw_message=message_context_data.get('raw_message', {}), ) - # Set query from message_content for logging purposes - context.query = context.message_context.message_content + + # Determine launcher_type from is_group flag + is_group = message_context_data.get('is_group', False) + launcher_type = LauncherTypes.GROUP if is_group else LauncherTypes.PERSON + + # Create WorkflowQuery object with launcher_type for monitoring + context.query = WorkflowQuery( + workflow_uuid=workflow_uuid, + workflow_name=workflow_name, + execution_id=execution_uuid, + launcher_type=launcher_type, + launcher_id=message_context_data.get('sender_id', ''), + sender_id=message_context_data.get('sender_id', ''), + sender_name=message_context_data.get('sender_name', 'User'), + message_context=context.message_context, + bot_uuid=bot_id, + trigger_type=TriggerType.MESSAGE, + trigger_data=raw_trigger_data, + variables={}, + ) # Note: Frontend panel logging has been removed. # A new solution will be implemented separately. @@ -413,7 +432,9 @@ class WorkflowService: # Store launcher info for monitoring (used when query is a string) if message_context_data: - context.variables['_launcher_type'] = 'websocket' + # Determine launcher_type from is_group flag + is_group = message_context_data.get('is_group', False) + context.variables['_launcher_type'] = 'group' if is_group else 'person' context.variables['_launcher_id'] = message_context_data.get('sender_id', '') context.variables['_sender_name'] = message_context_data.get('sender_name', 'User') diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index d2ec3826..26781242 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -2,11 +2,14 @@ from __future__ import annotations import asyncio import json +import logging import traceback import sqlalchemy from ..core import app, entities as core_entities, taskmgr +logger = logging.getLogger(__name__) + from ..discover import engine from ..entity.persistence import bot as persistence_bot @@ -99,6 +102,74 @@ class RuntimeBot: return binding_uuid, False + async def _handle_workflow_message( + self, + event: platform_events.MessageEvent, + adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, + workflow_uuid: str, + launcher_type: str, + launcher_id: str | int, + sender_id: str | int, + ) -> None: + """Handle message by executing the bound workflow directly.""" + message_content = str(event.message_chain) + message_chain_obj = event.message_chain + + # Build message context + sender_name = None + if hasattr(event, 'sender'): + sender = event.sender + if hasattr(sender, 'nickname'): + sender_name = sender.nickname + elif hasattr(sender, 'member_name'): + sender_name = sender.member_name + + is_group = launcher_type == 'group' + message_context = { + 'message_id': str(getattr(event, 'message_id', '')), + 'message_content': message_content, + 'sender_id': str(sender_id), + 'sender_name': sender_name or 'User', + 'platform': adapter.__class__.__name__, + 'conversation_id': str(launcher_id), + 'is_group': is_group, + 'group_id': str(launcher_id) if is_group else None, + 'mentions': [], + 'reply_to': None, + 'raw_message': { + 'message': message_chain_obj.model_dump() if hasattr(message_chain_obj, 'model_dump') else str(message_chain_obj), + 'launcher_id': launcher_id, + 'session_type': launcher_type, + }, + } + + trigger_data = { + 'message': message_content, + 'message_chain': message_chain_obj.model_dump() if hasattr(message_chain_obj, 'model_dump') else str(message_chain_obj), + 'session_type': launcher_type, + 'connection_id': str(launcher_id), + 'message_context': message_context, + } + + session_id = f'{launcher_type}_{launcher_id}' + logger.info(f'Processing workflow message from {session_id}: {message_content}') + + try: + from ..api.http.service.workflow import WorkflowExecutionFailedError + + execution_id = await self.ap.workflow_service.execute_workflow( + workflow_uuid=workflow_uuid, + trigger_type='message', + trigger_data=trigger_data, + session_id=session_id, + user_id=str(sender_id), + bot_id=self.bot_entity.uuid, + ) + except WorkflowExecutionFailedError as e: + await self.logger.error(f'Workflow execution failed: {e.message}') + except Exception as e: + await self.logger.error(f'Workflow execution error: {e}') + async def _record_discarded_message( self, launcher_type: provider_session.LauncherTypes, @@ -193,6 +264,20 @@ class RuntimeBot: message_text = str(event.message_chain) element_types = [comp.type for comp in event.message_chain] + binding_type, binding_uuid = self.get_binding_info() + + # Handle workflow binding separately from pipeline + if binding_type == 'workflow': + await self._handle_workflow_message( + event=event, + adapter=adapter, + workflow_uuid=binding_uuid, + launcher_type='person', + launcher_id=launcher_id, + sender_id=event.sender.id, + ) + return + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid( 'person', launcher_id, message_text, element_types ) @@ -254,6 +339,20 @@ class RuntimeBot: message_text = str(event.message_chain) element_types = [comp.type for comp in event.message_chain] + binding_type, binding_uuid = self.get_binding_info() + + # Handle workflow binding separately from pipeline + if binding_type == 'workflow': + await self._handle_workflow_message( + event=event, + adapter=adapter, + workflow_uuid=binding_uuid, + launcher_type='group', + launcher_id=launcher_id, + sender_id=event.sender.id, + ) + return + pipeline_uuid, routed_by_rule = self.resolve_pipeline_uuid( 'group', launcher_id, message_text, element_types ) diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 60922003..bdce8d09 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -354,9 +354,19 @@ class RuntimeConnectionHandler(handler.Handler): extra_args=extra_args, ) + # invoke_llm returns (message, usage_info) tuple + if isinstance(result, tuple) and len(result) == 2: + msg, usage_info = result + msg_dump = msg.model_dump() + # Attach usage info to message dump + if usage_info: + msg_dump['usage'] = usage_info + else: + msg_dump = result.model_dump() + return handler.ActionResponse.success( data={ - 'message': result.model_dump(), + 'message': msg_dump, }, ) diff --git a/src/langbot/pkg/provider/modelmgr/requester.py b/src/langbot/pkg/provider/modelmgr/requester.py index 7b29fe52..eca1efdb 100644 --- a/src/langbot/pkg/provider/modelmgr/requester.py +++ b/src/langbot/pkg/provider/modelmgr/requester.py @@ -69,6 +69,11 @@ class RuntimeProvider: if usage_info: input_tokens = usage_info.get('input_tokens', 0) output_tokens = usage_info.get('output_tokens', 0) + # Attach usage info to message using object.__setattr__ to bypass pydantic validation + try: + object.__setattr__(msg, 'usage', usage_info) + except (AttributeError, TypeError): + pass # If we can't set it, just skip it return msg else: return result diff --git a/src/langbot/pkg/workflow/entities.py b/src/langbot/pkg/workflow/entities.py index 0b050f57..b9a12b35 100644 --- a/src/langbot/pkg/workflow/entities.py +++ b/src/langbot/pkg/workflow/entities.py @@ -11,19 +11,31 @@ from typing import Any, Optional import pydantic # Import SDK entities for standard workflow protocol types +# These are re-exported for use by other modules in the workflow package. from langbot_plugin.api.entities.builtin.workflow.entities import ( - ExecutionContext, - ExecutionStep, - MessageContext, + ExecutionContext as ExecutionContext, + ExecutionStep as ExecutionStep, + MessageContext as MessageContext, NodeDefinition, - NodeState, - PortDefinition, + NodeState as NodeState, + PortDefinition as PortDefinition, ) from langbot_plugin.api.entities.builtin.workflow.enums import ( - ExecutionStatus, - NodeStatus, + ExecutionStatus as ExecutionStatus, + NodeStatus as NodeStatus, ) +__all__ = [ + "ExecutionContext", + "ExecutionStep", + "MessageContext", + "NodeDefinition", + "NodeState", + "PortDefinition", + "ExecutionStatus", + "NodeStatus", +] + class Position(pydantic.BaseModel): """Node position on canvas""" diff --git a/src/langbot/pkg/workflow/executor.py b/src/langbot/pkg/workflow/executor.py index 3bab8bdd..6493d793 100644 --- a/src/langbot/pkg/workflow/executor.py +++ b/src/langbot/pkg/workflow/executor.py @@ -32,7 +32,6 @@ from .entities import ( ) from ..entity.persistence import workflow as persistence_workflow from .registry import NodeTypeRegistry -from . import monitor if TYPE_CHECKING: from ..core import app diff --git a/src/langbot/pkg/workflow/monitoring_helper.py b/src/langbot/pkg/workflow/monitoring_helper.py index 15ba21b2..09b120af 100644 --- a/src/langbot/pkg/workflow/monitoring_helper.py +++ b/src/langbot/pkg/workflow/monitoring_helper.py @@ -20,17 +20,24 @@ import json if typing.TYPE_CHECKING: from ..core import app - from langbot_plugin.api.entities.builtin.workflow.query import WorkflowQuery class WorkflowMonitoringHelper: """Helper class for workflow monitoring operations""" + @staticmethod + def _is_workflow_query(query) -> bool: + """Check if query is a WorkflowQuery object""" + if query is None or isinstance(query, str): + return False + # Check for WorkflowQuery attributes + return hasattr(query, 'launcher_type') or hasattr(query, 'workflow_uuid') + @staticmethod def _get_session_id(query, context_vars: dict | None = None) -> str: """Build session_id from query or context_vars""" - # Try to get from query first - if not isinstance(query, str) and query.launcher_type: + # Try to get from WorkflowQuery first + if WorkflowMonitoringHelper._is_workflow_query(query) and query.launcher_type: launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type) launcher_id = query.launcher_id or 'unknown' return f'{launcher_type}_{launcher_id}' @@ -44,17 +51,23 @@ class WorkflowMonitoringHelper: @staticmethod def _get_platform(query, context_vars: dict | None = None) -> str: """Get platform name from query or context_vars""" - if not isinstance(query, str) and query.launcher_type: + # Try WorkflowQuery first + if WorkflowMonitoringHelper._is_workflow_query(query) and query.launcher_type: if hasattr(query.launcher_type, 'value'): return query.launcher_type.value return str(query.launcher_type) + + # Fallback to context_vars for launcher_type (person/group) + if context_vars and context_vars.get('_launcher_type'): + return context_vars['_launcher_type'] + return 'workflow' @staticmethod def _get_sender_name(query, context_vars: dict | None = None) -> str | None: """Get sender name from query or context_vars""" - # Try query first - if not isinstance(query, str): + # Try WorkflowQuery first + if WorkflowMonitoringHelper._is_workflow_query(query): if query.sender_name: return query.sender_name if query.message_event and hasattr(query.message_event, 'sender'): diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py index b07701b1..09b91994 100644 --- a/src/langbot/pkg/workflow/nodes/llm_call.py +++ b/src/langbot/pkg/workflow/nodes/llm_call.py @@ -234,19 +234,25 @@ Respond in the same language as the user's input. messages: list, funcs: list | None, extra_args: dict, - ) -> tuple[Any, Any]: - """Try non-streaming invocation with sequential fallback. Returns (message, model_used).""" + ) -> tuple[Any, Any, dict]: + """Try non-streaming invocation with sequential fallback. Returns (message, model_used, usage_info).""" last_error = None for model in candidates: try: - msg = await model.provider.invoke_llm( + result = await model.provider.invoke_llm( query=None, model=model, messages=messages, funcs=funcs if model.model_entity.abilities.__contains__('func_call') else [], extra_args=extra_args, ) - return msg, model + # invoke_llm returns (message, usage_info) tuple + if isinstance(result, tuple) and len(result) == 2: + msg, usage_info = result + else: + msg = result + usage_info = {} + return msg, model, usage_info except Exception as e: last_error = e logger.warning(f'[LLM:{self.node_id}] Model {model.model_entity.name} failed: {e}, trying next...') @@ -514,7 +520,7 @@ Respond in the same language as the user's input. # Invoke LLM with fallback try: - result_message, used_model = await self._invoke_with_fallback( + result_message, used_model, llm_usage = await self._invoke_with_fallback( candidates=candidates, messages=messages, funcs=None, @@ -579,25 +585,31 @@ Respond in the same language as the user's input. 'blocked_by_filter': True, } - # Extract usage info - if hasattr(result_message, 'usage') and result_message.usage: + # Extract usage info from LLM call result + # Priority: llm_usage (from _invoke_with_fallback) > result_message.usage > result_message.token_usage + if llm_usage: + usage = { + 'prompt_tokens': llm_usage.get('input_tokens', 0) or llm_usage.get('prompt_tokens', 0), + 'completion_tokens': llm_usage.get('output_tokens', 0) or llm_usage.get('completion_tokens', 0), + 'total_tokens': llm_usage.get('total_tokens', 0), + } + # Check result_message.usage (set by RuntimeProvider.invoke_llm) + elif hasattr(result_message, 'usage') and result_message.usage: u = result_message.usage - # Handle both object and dict usage if isinstance(u, dict): usage = { - 'prompt_tokens': u.get('prompt_tokens', 0) or 0, - 'completion_tokens': u.get('completion_tokens', 0) or 0, - 'total_tokens': u.get('total_tokens', 0) or 0, + 'prompt_tokens': u.get('input_tokens', 0) or u.get('prompt_tokens', 0), + 'completion_tokens': u.get('output_tokens', 0) or u.get('completion_tokens', 0), + 'total_tokens': u.get('total_tokens', 0), } else: usage = { - 'prompt_tokens': getattr(u, 'prompt_tokens', 0) or 0, - 'completion_tokens': getattr(u, 'completion_tokens', 0) or 0, - 'total_tokens': getattr(u, 'total_tokens', 0) or 0, + 'prompt_tokens': getattr(u, 'input_tokens', 0) or getattr(u, 'prompt_tokens', 0), + 'completion_tokens': getattr(u, 'output_tokens', 0) or getattr(u, 'completion_tokens', 0), + 'total_tokens': getattr(u, 'total_tokens', 0), } elif hasattr(result_message, 'token_usage') and result_message.token_usage: u = result_message.token_usage - # Handle both object and dict token_usage if isinstance(u, dict): usage = { 'prompt_tokens': u.get('prompt_tokens', 0) or 0, diff --git a/web/src/app/home/bots/components/bot-form/BotForm.tsx b/web/src/app/home/bots/components/bot-form/BotForm.tsx index 62bb157a..5e7ddfc7 100644 --- a/web/src/app/home/bots/components/bot-form/BotForm.tsx +++ b/web/src/app/home/bots/components/bot-form/BotForm.tsx @@ -320,6 +320,7 @@ export default function BotForm({ setIsLoading(true); if (initBotId) { const formValues = form.getValues(); + const bindingType = formValues.binding_type ?? 'pipeline'; const updateBot: Bot = { uuid: initBotId, name: formValues.name, @@ -327,13 +328,13 @@ export default function BotForm({ adapter: formValues.adapter, adapter_config: formValues.adapter_config, enable: formValues.enable, - binding_type: formValues.binding_type ?? 'pipeline', + binding_type: bindingType, binding_uuid: formValues.binding_uuid ?? '', - // Sync use_pipeline_uuid for backward compatibility when binding_type is 'pipeline' - use_pipeline_uuid: - formValues.binding_type === 'pipeline' - ? formValues.binding_uuid - : formValues.use_pipeline_uuid, + // Only send use_pipeline_uuid when binding_type is 'pipeline' + // For 'workflow' binding, we don't need this field + ...(bindingType === 'pipeline' && { + use_pipeline_uuid: formValues.binding_uuid, + }), }; httpClient .updateBot(initBotId, updateBot)