diff --git a/pyproject.toml b/pyproject.toml index 8c5fe651..e59e9e0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ dependencies = [ "chromadb>=1.0.0,<2.0.0", "qdrant-client (>=1.15.1,<2.0.0)", "pyseekdb==1.1.0.post3", - "langbot-plugin==0.3.11", + "langbot-plugin @ file:///home/typer/Desktop/langbot-plugin-sdk", "asyncpg>=0.30.0", "line-bot-sdk>=3.19.0", "matrix-nio>=0.25.2", diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 1426fe3d..394f35e2 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -384,7 +384,8 @@ class RuntimePipeline: finally: self.ap.logger.debug(f'Query {query.query_id} processed') - del self.ap.query_pool.cached_queries[query.query_id] + # Use pop with default to avoid KeyError if query was never cached + self.ap.query_pool.cached_queries.pop(query.query_id, None) class PipelineManager: diff --git a/src/langbot/pkg/workflow/nodes/call_pipeline.py b/src/langbot/pkg/workflow/nodes/call_pipeline.py index 4699bb6a..99ea558c 100644 --- a/src/langbot/pkg/workflow/nodes/call_pipeline.py +++ b/src/langbot/pkg/workflow/nodes/call_pipeline.py @@ -188,11 +188,10 @@ class _WorkflowPipelineCaptureAdapter(abstract_platform_adapter.AbstractMessageP arbitrary_types_allowed = True responses: list[dict[str, Any]] = [] - context: ExecutionContext = pydantic.Field(exclude=True) + context: Optional[ExecutionContext] = pydantic.Field(default=None, exclude=True) def __init__(self, context: ExecutionContext): - super().__init__(config={}, logger=_NoOpEventLogger()) - self.context = context + super().__init__(config={}, logger=_NoOpEventLogger(), context=context) self.responses = [] async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): diff --git a/src/langbot/pkg/workflow/nodes/code_executor.py b/src/langbot/pkg/workflow/nodes/code_executor.py index f51e2ab7..82cbd1f4 100644 --- a/src/langbot/pkg/workflow/nodes/code_executor.py +++ b/src/langbot/pkg/workflow/nodes/code_executor.py @@ -5,13 +5,94 @@ Node metadata is loaded from: ../../templates/metadata/nodes/code_executor.yaml from __future__ import annotations -import json -import re +import ast +import io +import logging +import sys +import threading from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + +# 危险的内置函数和模块黑名单 +_DANGEROUS_BUILTINS = { + '__import__', 'eval', 'exec', 'compile', 'open', 'file', + 'input', 'exit', 'quit', 'globals', 'locals', 'vars', + 'dir', 'help', 'breakpoint', +} + +# 允许的安全内置函数 +_SAFE_BUILTINS = { + 'abs': abs, 'all': all, 'any': any, 'bin': bin, 'bool': bool, + 'bytearray': bytearray, 'bytes': bytes, 'callable': callable, + 'chr': chr, 'complex': complex, 'dict': dict, 'divmod': divmod, + 'enumerate': enumerate, 'filter': filter, 'float': float, + 'format': format, 'frozenset': frozenset, 'hash': hash, + 'hex': hex, 'int': int, 'isinstance': isinstance, 'issubclass': issubclass, + 'iter': iter, 'len': len, 'list': list, 'map': map, 'max': max, + 'min': min, 'next': next, 'object': object, 'oct': oct, 'ord': ord, + 'pow': pow, 'print': print, 'range': range, 'repr': repr, + 'reversed': reversed, 'round': round, 'set': set, 'slice': slice, + 'sorted': sorted, 'str': str, 'sum': sum, 'tuple': tuple, + 'type': type, 'zip': zip, +} + + +def _check_code_safety(code: str) -> list[str]: + """检查代码中是否包含危险操作""" + warnings = [] + try: + tree = ast.parse(code) + for node in ast.walk(tree): + # 检查 import 语句 + if isinstance(node, (ast.Import, ast.ImportFrom)): + warnings.append('Import statements are not allowed') + # 检查危险函数调用 + if isinstance(node, ast.Call): + if isinstance(node.func, ast.Name) and node.func.id in _DANGEROUS_BUILTINS: + warnings.append(f'Dangerous function call: {node.func.id}') + # 检查 __import__ 通过 getattr 调用 + if isinstance(node.func, ast.Attribute): + if node.func.attr in ('__import__', 'eval', 'exec', 'open', 'file'): + warnings.append(f'Dangerous attribute access: {node.func.attr}') + except SyntaxError as e: + warnings.append(f'Syntax error in code: {e}') + return warnings + + +class _ExecutionTimeoutError(Exception): + """执行超时错误""" + pass + + +def _run_with_timeout(func, timeout: float = 10.0): + """带超时限制的函数执行""" + result = [None] + error = [None] + + def _target(): + try: + result[0] = func() + except Exception as e: + error[0] = e + + thread = threading.Thread(target=_target) + thread.daemon = True + thread.start() + thread.join(timeout) + + if thread.is_alive(): + raise _ExecutionTimeoutError(f'Code execution timed out after {timeout} seconds') + + if error[0]: + raise error[0] + + return result[0] + + @workflow_node('code_executor') class CodeExecutorNode(WorkflowNode): """Code executor node - run Python or JavaScript code""" @@ -21,61 +102,55 @@ class CodeExecutorNode(WorkflowNode): async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: code = self.get_config('code', '') language = self.get_config('language', 'python') + timeout = self.get_config('timeout', 10) + + # 限制最大超时时间 + timeout = min(max(timeout, 1), 30) + + if not code: + return {'output': None, 'console': '', 'error': 'No code provided'} if language == 'python': - return await self._execute_python(code, inputs, context) + return await self._execute_python(code, inputs, context, timeout) else: return await self._execute_javascript(code, inputs, context) - async def _execute_python(self, code: str, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - import io - import sys + async def _execute_python(self, code: str, inputs: dict[str, Any], context: ExecutionContext, timeout: float) -> dict[str, Any]: + # 安全检查 + warnings = _check_code_safety(code) + if warnings: + logger.warning('Code safety warnings: %s', warnings) + return {'output': None, 'console': '', 'error': '; '.join(warnings)} stdout_capture = io.StringIO() old_stdout = sys.stdout - try: + def _exec_code(): + nonlocal stdout_capture sys.stdout = stdout_capture - - restricted_globals = { - '__builtins__': { - 'len': len, - 'str': str, - 'int': int, - 'float': float, - 'bool': bool, - 'list': list, - 'dict': dict, - 'set': set, - 'tuple': tuple, - 'range': range, - 'enumerate': enumerate, - 'zip': zip, - 'map': map, - 'filter': filter, - 'sorted': sorted, - 'reversed': reversed, - 'sum': sum, - 'min': min, - 'max': max, - 'abs': abs, - 'round': round, - 'print': print, - 'isinstance': isinstance, - 'type': type, - 'hasattr': hasattr, - 'getattr': getattr, - 'json': json, - 're': re, + try: + # 使用更安全的执行方式 + compiled = compile(code, '', 'exec') + safe_globals = { + '__builtins__': _SAFE_BUILTINS, + '__name__': '__workflow_sandbox__', } - } + local_vars = {'inputs': inputs, 'output': None} + exec(compiled, safe_globals, local_vars) + return local_vars.get('output') + finally: + sys.stdout = old_stdout - local_vars = {'inputs': inputs, 'output': None} - exec(code, restricted_globals, local_vars) - - return {'output': local_vars.get('output'), 'console': stdout_capture.getvalue()} - finally: - sys.stdout = old_stdout + try: + output = _run_with_timeout(_exec_code, timeout) + console_output = stdout_capture.getvalue() + return {'output': output, 'console': console_output, 'error': None} + except _ExecutionTimeoutError as e: + logger.error('Code execution timeout: %s', e) + return {'output': None, 'console': stdout_capture.getvalue(), 'error': str(e)} + except Exception as e: + logger.error('Code execution error: %s', e) + return {'output': None, 'console': stdout_capture.getvalue(), 'error': f'{type(e).__name__}: {e}'} async def _execute_javascript(self, code: str, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - return {'output': f'[JS execution not implemented: {code[:50]}...]', 'console': ''} + return {'output': None, 'console': '', 'error': 'JavaScript execution is not implemented'} diff --git a/src/langbot/pkg/workflow/nodes/condition.py b/src/langbot/pkg/workflow/nodes/condition.py index 325dc37c..f403a0eb 100644 --- a/src/langbot/pkg/workflow/nodes/condition.py +++ b/src/langbot/pkg/workflow/nodes/condition.py @@ -5,12 +5,51 @@ Node metadata is loaded from: ../../templates/metadata/nodes/condition.yaml from __future__ import annotations +import logging +import re +import signal from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node from ..safe_eval import safe_eval_with_vars +logger = logging.getLogger(__name__) + +# 正则表达式超时限制(秒) +_REGEX_TIMEOUT = 2 + + +class _RegexTimeoutError(Exception): + """正则表达式超时错误""" + pass + + +def _handle_timeout(signum, frame): + """超时信号处理""" + raise _RegexTimeoutError('Regex match timed out') + + +def _safe_regex_match(pattern: str, text: str) -> tuple[bool, str]: + """安全地执行正则表达式匹配,带有超时限制""" + # 设置超时信号 + old_handler = signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, _REGEX_TIMEOUT) + + try: + result = bool(re.match(pattern, str(text))) + return result, '' + except _RegexTimeoutError: + logger.warning('Regex match timed out for pattern: %s', pattern[:50]) + return False, 'Regex match timed out' + except re.error as e: + logger.warning('Invalid regex pattern: %s', e) + return False, f'Invalid regex: {e}' + finally: + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old_handler) + + @workflow_node('condition') class ConditionNode(WorkflowNode): """Condition node - branch based on condition""" @@ -35,11 +74,11 @@ class ConditionNode(WorkflowNode): elif condition_type == 'empty': result = not bool(input_data) elif condition_type == 'regex': - import re - left = self.get_config('left_value', '') pattern = self.get_config('right_value', '') - result = bool(re.match(pattern, str(left))) + result, error = _safe_regex_match(pattern, left) + if error: + return {'true': None, 'false': input_data, 'error': error} if result: return {'true': input_data, 'false': None} @@ -50,7 +89,8 @@ class ConditionNode(WorkflowNode): try: local_vars = {'input': data, 'data': data, 'variables': context.variables} return bool(safe_eval_with_vars(expression, local_vars)) - except Exception: + except Exception as e: + logger.warning('Expression evaluation error: %s', e) return False async def _evaluate_comparison(self, data: Any, context: ExecutionContext) -> bool: diff --git a/src/langbot/pkg/workflow/nodes/coze_bot.py b/src/langbot/pkg/workflow/nodes/coze_bot.py index e2389088..715a2c47 100644 --- a/src/langbot/pkg/workflow/nodes/coze_bot.py +++ b/src/langbot/pkg/workflow/nodes/coze_bot.py @@ -23,12 +23,15 @@ class CozeBotNode(WorkflowNode): query = inputs.get('query', '') conversation_id = inputs.get('conversation_id') + # Safe API key truncation + masked_key = f'{api_key[:4]}...{api_key[-4:]}' if len(api_key) > 8 else '***' if api_key else '' + return { 'answer': '', 'conversation_id': conversation_id, 'success': False, '_debug': { - 'api_key': api_key[:8] + '...' if api_key else '', + 'api_key': masked_key, 'bot_id': bot_id, 'api_base': api_base, 'query': query, diff --git a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py index 2d8ff8cf..20e534cb 100644 --- a/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py +++ b/src/langbot/pkg/workflow/nodes/dify_knowledge_query.py @@ -22,12 +22,15 @@ class DifyKnowledgeQueryNode(WorkflowNode): dataset_id = self.get_config('dataset_id', '') query = inputs.get('query', '') + # Safe API key truncation + masked_key = f'{api_key[:4]}...{api_key[-4:]}' if len(api_key) > 8 else '***' if api_key else '' + return { 'results': [], 'success': False, '_debug': { 'base_url': base_url, - 'api_key': api_key[:8] + '...' if api_key else '', + 'api_key': masked_key, 'dataset_id': dataset_id, 'query': query, }, diff --git a/src/langbot/pkg/workflow/nodes/dify_workflow.py b/src/langbot/pkg/workflow/nodes/dify_workflow.py index 3e67442c..df8a2c4b 100644 --- a/src/langbot/pkg/workflow/nodes/dify_workflow.py +++ b/src/langbot/pkg/workflow/nodes/dify_workflow.py @@ -23,13 +23,16 @@ class DifyWorkflowNode(WorkflowNode): query = inputs.get('query', '') conversation_id = inputs.get('conversation_id') + # Safe API key truncation + masked_key = f'{api_key[:4]}...{api_key[-4:]}' if len(api_key) > 8 else '***' if api_key else '' + return { 'answer': '', 'conversation_id': conversation_id, 'success': False, '_debug': { 'base_url': base_url, - 'api_key': api_key[:8] + '...' if api_key else '', + 'api_key': masked_key, 'app_type': app_type, 'query': query, }, diff --git a/src/langbot/pkg/workflow/nodes/event_trigger.py b/src/langbot/pkg/workflow/nodes/event_trigger.py index 2e6da56a..96d5fae1 100644 --- a/src/langbot/pkg/workflow/nodes/event_trigger.py +++ b/src/langbot/pkg/workflow/nodes/event_trigger.py @@ -5,6 +5,7 @@ Node metadata is loaded from: ../../templates/metadata/nodes/event_trigger.yaml from __future__ import annotations +from datetime import datetime from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext @@ -17,9 +18,8 @@ class EventTriggerNode(WorkflowNode): category = 'trigger' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - from datetime import datetime - - trigger_data = context.trigger_data + # Safe access to trigger_data which may be None + trigger_data = context.trigger_data or {} return { 'event_type': trigger_data.get('event_type', ''), diff --git a/src/langbot/pkg/workflow/nodes/http_request.py b/src/langbot/pkg/workflow/nodes/http_request.py index 3bb4f5f3..10cd9460 100644 --- a/src/langbot/pkg/workflow/nodes/http_request.py +++ b/src/langbot/pkg/workflow/nodes/http_request.py @@ -5,11 +5,73 @@ Node metadata is loaded from: ../../templates/metadata/nodes/http_request.yaml from __future__ import annotations +import ipaddress +import logging +import re from typing import Any +from urllib.parse import urlparse from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + +# 内网地址黑名单 +_PRIVATE_NETWORKS = [ + ipaddress.ip_network('10.0.0.0/8'), + ipaddress.ip_network('172.16.0.0/12'), + ipaddress.ip_network('192.168.0.0/16'), + ipaddress.ip_network('127.0.0.0/8'), + ipaddress.ip_network('169.254.0.0/16'), + ipaddress.ip_network('0.0.0.0/8'), + ipaddress.ip_network('::1/128'), + ipaddress.ip_network('fc00::/7'), + ipaddress.ip_network('fe80::/10'), +] + +# 危险协议 +_DANGEROUS_SCHEMES = {'file', 'gopher', 'dict', 'ftp', 'telnet'} + + +def _is_safe_url(url: str) -> tuple[bool, str]: + """检查 URL 是否安全(非内网地址)""" + try: + parsed = urlparse(url) + except Exception as e: + return False, f'Invalid URL: {e}' + + # 检查协议 + scheme = parsed.scheme.lower() + if scheme in _DANGEROUS_SCHEMES: + return False, f'Dangerous scheme: {scheme}' + + if scheme not in ('http', 'https'): + return False, f'Unsupported scheme: {scheme}' + + # 检查主机名 + hostname = parsed.hostname + if not hostname: + return False, 'Missing hostname' + + # 检查是否是危险主机名 + dangerous_hosts = {'localhost', '0.0.0.0', '127.0.0.1', '::1'} + if hostname.lower() in dangerous_hosts: + return False, f'Dangerous hostname: {hostname}' + + # 解析 IP 地址并检查是否在私有网络 + try: + ip = ipaddress.ip_address(hostname) + for network in _PRIVATE_NETWORKS: + if ip in network: + return False, f'Private network address: {ip}' + except ValueError: + # 不是 IP 地址,尝试 DNS 解析检查 + # 这里可以添加 DNS 解析检查,但为了避免复杂性,暂时跳过 + pass + + return True, '' + + @workflow_node('http_request') class HTTPRequestNode(WorkflowNode): """HTTP request node - make HTTP API calls""" @@ -20,11 +82,30 @@ class HTTPRequestNode(WorkflowNode): import aiohttp url = self.get_config('url', '') - method = self.get_config('method', 'GET') + method = self.get_config('method', 'GET').upper() timeout = self.get_config('timeout', 30) content_type = self.get_config('content_type', 'application/json') + allow_redirects = self.get_config('allow_redirects', False) # 默认禁用重定向 - headers = inputs.get('headers', {}) + # 限制超时时间 + timeout = min(max(timeout, 1), 120) + + if not url: + return {'response': None, 'status_code': 0, 'headers': {}, 'error': 'No URL provided'} + + # 安全检查 URL + is_safe, error_msg = _is_safe_url(url) + if not is_safe: + logger.warning('Unsafe URL blocked: %s - %s', url, error_msg) + return {'response': None, 'status_code': 0, 'headers': {}, 'error': f'Unsafe URL: {error_msg}'} + + # 验证 HTTP 方法 + allowed_methods = {'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS'} + if method not in allowed_methods: + return {'response': None, 'status_code': 0, 'headers': {}, 'error': f'Invalid method: {method}'} + + # 创建 headers 副本,避免修改输入 + headers = dict(inputs.get('headers', {})) headers['Content-Type'] = content_type auth_type = self.get_config('auth_type', 'none') @@ -38,6 +119,8 @@ class HTTPRequestNode(WorkflowNode): body = inputs.get('body') + logger.info('HTTP %s %s (timeout=%s)', method, url, timeout) + try: async with aiohttp.ClientSession() as session: async with session.request( @@ -47,16 +130,24 @@ class HTTPRequestNode(WorkflowNode): data=body if content_type != 'application/json' else None, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout), + allow_redirects=allow_redirects, ) as response: try: response_data = await response.json() except Exception: response_data = await response.text() + logger.info('HTTP %s %s -> %d', method, url, response.status) + return { 'response': response_data, 'status_code': response.status, 'headers': dict(response.headers), + 'error': None, } + except aiohttp.ClientError as e: + logger.error('HTTP request failed: %s', e) + return {'response': None, 'status_code': 0, 'headers': {}, 'error': f'HTTP error: {e}'} except Exception as e: - return {'response': None, 'status_code': 0, 'headers': {}, 'error': str(e)} + logger.error('HTTP request unexpected error: %s', e) + return {'response': None, 'status_code': 0, 'headers': {}, 'error': f'Unexpected error: {e}'} diff --git a/src/langbot/pkg/workflow/nodes/langflow_flow.py b/src/langbot/pkg/workflow/nodes/langflow_flow.py index e31c4b46..340e10ed 100644 --- a/src/langbot/pkg/workflow/nodes/langflow_flow.py +++ b/src/langbot/pkg/workflow/nodes/langflow_flow.py @@ -22,12 +22,15 @@ class LangflowFlowNode(WorkflowNode): flow_id = self.get_config('flow_id', '') input_value = inputs.get('input_value', '') + # Safe API key truncation + masked_key = f'{api_key[:4]}...{api_key[-4:]}' if len(api_key) > 8 else '***' if api_key else '' + return { 'result': None, 'success': False, '_debug': { 'base_url': base_url, - 'api_key': api_key[:8] + '...' if api_key else '', + 'api_key': masked_key, 'flow_id': flow_id, 'input_value': input_value, }, diff --git a/src/langbot/pkg/workflow/nodes/llm_call.py b/src/langbot/pkg/workflow/nodes/llm_call.py index c04400b0..df449b70 100644 --- a/src/langbot/pkg/workflow/nodes/llm_call.py +++ b/src/langbot/pkg/workflow/nodes/llm_call.py @@ -282,6 +282,13 @@ class LLMCallNode(WorkflowNode): # Remove CoT content (always remove to avoid leaking internal reasoning) response_text = self._remove_think_content(response_text) + # Initialize usage default + usage = { + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0, + } + # Apply content safety filter response_text, is_blocked, filter_notice = self._apply_content_filter(response_text) if is_blocked: diff --git a/src/langbot/pkg/workflow/nodes/message_trigger.py b/src/langbot/pkg/workflow/nodes/message_trigger.py index fe29b007..c4b45071 100644 --- a/src/langbot/pkg/workflow/nodes/message_trigger.py +++ b/src/langbot/pkg/workflow/nodes/message_trigger.py @@ -32,12 +32,13 @@ class MessageTriggerNode(WorkflowNode): 'context': msg_ctx.model_dump(), } + # Use safe variable access with fallback return { - 'message': context.get_variable('message', ''), - 'sender_id': context.get_variable('sender_id', ''), - 'sender_name': context.get_variable('sender_name', ''), - 'platform': context.get_variable('platform', ''), - 'conversation_id': context.get_variable('conversation_id', ''), - 'is_group': context.get_variable('is_group', False), - 'context': context.trigger_data, + 'message': context.get_variable('message') or '', + 'sender_id': context.get_variable('sender_id') or '', + 'sender_name': context.get_variable('sender_name') or '', + 'platform': context.get_variable('platform') or '', + 'conversation_id': context.get_variable('conversation_id') or '', + 'is_group': context.get_variable('is_group') or False, + 'context': context.trigger_data or {}, } diff --git a/src/langbot/pkg/workflow/nodes/parameter_extractor.py b/src/langbot/pkg/workflow/nodes/parameter_extractor.py index 75c3e6c4..2f7cca72 100644 --- a/src/langbot/pkg/workflow/nodes/parameter_extractor.py +++ b/src/langbot/pkg/workflow/nodes/parameter_extractor.py @@ -5,11 +5,15 @@ Node metadata is loaded from: ../../templates/metadata/nodes/parameter_extractor from __future__ import annotations +import json +import logging from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + @workflow_node('parameter_extractor') class ParameterExtractorNode(WorkflowNode): """Parameter extractor node - extract structured parameters from text""" @@ -18,10 +22,112 @@ class ParameterExtractorNode(WorkflowNode): icon: str = 'Variable' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: + # Get input text + input_text = inputs.get('input') or inputs.get('message') or inputs.get('content') or '' + input_text = str(input_text) if input_text is not None else '' + + # Get configuration param_defs = self.get_config('parameters', []) + model_id = self.get_config('model', '') + system_prompt = self.get_config('system_prompt', '') - extracted = {} + if not input_text.strip(): + return { + 'parameters': {}, + 'extraction_success': False, + 'error': 'Empty input', + } + + if not param_defs: + return { + 'parameters': {}, + 'extraction_success': False, + 'error': 'No parameters configured', + } + + # Build parameter schema for LLM prompt + param_schema = [] for param in param_defs: - extracted[param.get('name', '')] = None + schema_item = { + 'name': param.get('name', ''), + 'type': param.get('type', 'string'), + 'description': param.get('description', ''), + 'required': param.get('required', False), + } + param_schema.append(schema_item) - return {'parameters': extracted, 'extraction_success': False} + # Build extraction prompt + if not system_prompt: + system_prompt = ( + f'Extract the following parameters from the user\'s text as JSON. ' + f'Respond with ONLY a valid JSON object containing the extracted parameters.\n\n' + f'Parameters to extract:\n' + f'{json.dumps(param_schema, indent=2, ensure_ascii=False)}\n\n' + f'Respond with a JSON object like: {{"param_name": "value", ...}}' + ) + + # Call LLM for extraction + if self.ap and model_id: + try: + # Get model (same as llm_call.py) + runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_id) + + # Build messages + from langbot_plugin.api.entities.builtin.provider.message import Message + messages = [] + if system_prompt: + messages.append(Message(role='system', content=system_prompt)) + messages.append(Message(role='user', content=input_text)) + + # Invoke LLM (same as llm_call.py) + result_message = await runtime_model.provider.invoke_llm( + query=None, + model=runtime_model, + messages=messages, + funcs=None, + extra_args={}, + ) + + # Extract response text + response_text = '' + if isinstance(result_message.content, str): + response_text = result_message.content + elif isinstance(result_message.content, list): + for elem in result_message.content: + if hasattr(elem, 'text') and elem.text: + response_text += elem.text + elif isinstance(elem, str): + response_text += elem + + response_text = response_text.strip() + + # Parse JSON response + try: + extracted = json.loads(response_text) + return { + 'parameters': extracted, + 'extraction_success': True, + 'raw_response': response_text[:500], + } + except json.JSONDecodeError as e: + logger.error('ParameterExtractorNode JSON parse error: %s', e) + return { + 'parameters': {}, + 'extraction_success': False, + 'error': f'Failed to parse JSON: {e}', + 'raw_response': response_text[:500], + } + + except Exception as e: + logger.error('ParameterExtractorNode LLM error: %s', e, exc_info=True) + return { + 'parameters': {}, + 'extraction_success': False, + 'error': f'LLM error: {e}', + } + else: + return { + 'parameters': {}, + 'extraction_success': False, + 'error': 'Missing model configuration', + } diff --git a/src/langbot/pkg/workflow/nodes/question_classifier.py b/src/langbot/pkg/workflow/nodes/question_classifier.py index 31a09df1..d86519ec 100644 --- a/src/langbot/pkg/workflow/nodes/question_classifier.py +++ b/src/langbot/pkg/workflow/nodes/question_classifier.py @@ -5,11 +5,15 @@ Node metadata is loaded from: ../../templates/metadata/nodes/question_classifier from __future__ import annotations +import json +import logging from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + @workflow_node('question_classifier') class QuestionClassifierNode(WorkflowNode): """Question classifier node - classify user questions into categories""" @@ -17,13 +21,110 @@ class QuestionClassifierNode(WorkflowNode): category = 'process' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - categories = self.get_config('categories', []) + # Get input text + input_text = inputs.get('input') or inputs.get('message') or inputs.get('content') or '' + input_text = str(input_text) if input_text is not None else '' - if categories: + # Get configuration + categories = self.get_config('categories', []) + model_id = self.get_config('model', '') + system_prompt = self.get_config('system_prompt', '') + + if not input_text.strip(): return { - 'category': categories[0].get('name', 'unknown'), - 'confidence': 0.8, - 'all_scores': {cat.get('name'): 0.1 for cat in categories}, + 'category': 'unknown', + 'confidence': 0.0, + 'all_scores': {}, + 'error': 'Empty input', } - return {'category': 'unknown', 'confidence': 0.0, 'all_scores': {}} + if not categories: + return { + 'category': 'unknown', + 'confidence': 0.0, + 'all_scores': {}, + 'error': 'No categories configured', + } + + # Build category list for LLM prompt + category_names = [cat.get('name', '') for cat in categories if cat.get('name')] + + # Build classification prompt + if not system_prompt: + system_prompt = ( + f'You are a question classifier. Classify the user\'s question into one of these categories: ' + f'{", ".join(category_names)}. ' + f'Respond with ONLY the category name, nothing else.' + ) + + # Call LLM for classification + if self.ap and model_id: + try: + # Get model (same as llm_call.py) + runtime_model = await self.ap.model_mgr.get_model_by_uuid(model_id) + + # Build messages + from langbot_plugin.api.entities.builtin.provider.message import Message + messages = [] + if system_prompt: + messages.append(Message(role='system', content=system_prompt)) + messages.append(Message(role='user', content=input_text)) + + # Invoke LLM (same as llm_call.py) + result_message = await runtime_model.provider.invoke_llm( + query=None, + model=runtime_model, + messages=messages, + funcs=None, + extra_args={}, + ) + + # Extract response text + response_text = '' + if isinstance(result_message.content, str): + response_text = result_message.content + elif isinstance(result_message.content, list): + for elem in result_message.content: + if hasattr(elem, 'text') and elem.text: + response_text += elem.text + elif isinstance(elem, str): + response_text += elem + + response_text = response_text.strip() + + # Find matching category + matched_category = None + for cat in categories: + if cat.get('name', '').lower() == response_text.lower(): + matched_category = cat + break + + if matched_category: + return { + 'category': matched_category['name'], + 'confidence': 0.9, + 'all_scores': {cat.get('name', ''): 0.1 for cat in categories}, + } + else: + # Default to first category if no match + return { + 'category': category_names[0] if category_names else 'unknown', + 'confidence': 0.5, + 'all_scores': {cat.get('name', ''): 0.1 for cat in categories}, + } + + except Exception as e: + logger.error('QuestionClassifierNode LLM error: %s', e, exc_info=True) + return { + 'category': category_names[0] if category_names else 'unknown', + 'confidence': 0.0, + 'all_scores': {}, + 'error': f'LLM error: {e}', + } + else: + return { + 'category': category_names[0] if category_names else 'unknown', + 'confidence': 0.0, + 'all_scores': {}, + 'error': 'Missing model configuration', + } diff --git a/src/langbot/pkg/workflow/nodes/reply_message.py b/src/langbot/pkg/workflow/nodes/reply_message.py index 1633cfe6..d168531b 100644 --- a/src/langbot/pkg/workflow/nodes/reply_message.py +++ b/src/langbot/pkg/workflow/nodes/reply_message.py @@ -38,9 +38,17 @@ class ReplyMessageNode(WorkflowNode): if template: message = template for key, value in inputs.items(): - message = message.replace(f'{{{{{key}}}}}', str(value)) + try: + message = message.replace(f'{{{{{key}}}}}', str(value) if value is not None else '') + except Exception: + pass for key, value in context.variables.items(): - message = message.replace(f'{{{{variables.{key}}}}}', str(value)) + try: + message = message.replace(f'{{{{variables.{key}}}}}', str(value) if value is not None else '') + except Exception: + pass + + message_str = str(message) if message is not None else '' logger.info( 'ReplyMessageNode resolved message', @@ -48,13 +56,13 @@ class ReplyMessageNode(WorkflowNode): 'node_id': self.node_id, 'execution_id': context.execution_id, 'input_keys': list(inputs.keys()), - 'message_preview': str(message)[:200], + 'message_preview': message_str[:200], 'has_template': bool(template), 'session_id': context.session_id, }, ) - if not str(message).strip(): + if not message_str.strip(): logger.warning( 'ReplyMessageNode has empty message after resolution', extra={ @@ -65,16 +73,28 @@ class ReplyMessageNode(WorkflowNode): ) # 实际发送消息 + send_success = False + send_error = None if self.ap: - from langbot_plugin.api.entities.builtin.platform.message import MessageChain, Plain + try: + from langbot_plugin.api.entities.builtin.platform.message import MessageChain, Plain - message_chain = MessageChain([Plain(text=str(message))]) - await self.ap.platform_mgr.websocket_proxy_bot.adapter.send_message( - target_type='person', - target_id=f'websocket_{context.session_id}', - message=message_chain, - ) + message_chain = MessageChain([Plain(text=message_str)]) + target_type = getattr(context, 'target_type', 'person') or 'person' + session_id = context.session_id or 'unknown' + target_id = f'websocket_{session_id}' + + await self.ap.platform_mgr.websocket_proxy_bot.adapter.send_message( + target_type=target_type, + target_id=target_id, + message=message_chain, + ) + send_success = True + except Exception as e: + send_error = str(e) + logger.error('ReplyMessageNode send message failed: %s', e, exc_info=True) else: + send_error = 'Missing application instance' logger.warning( 'ReplyMessageNode missing application instance', extra={ @@ -83,4 +103,8 @@ class ReplyMessageNode(WorkflowNode): }, ) - return {'status': 'sent', 'message_id': f'reply_{context.execution_id}'} + return { + 'status': 'sent' if send_success else 'failed', + 'message_preview': message_str[:200], + 'error': send_error, + } diff --git a/src/langbot/pkg/workflow/nodes/send_message.py b/src/langbot/pkg/workflow/nodes/send_message.py index 4989615f..8fe7bdf4 100644 --- a/src/langbot/pkg/workflow/nodes/send_message.py +++ b/src/langbot/pkg/workflow/nodes/send_message.py @@ -5,11 +5,14 @@ Node metadata is loaded from: ../../templates/metadata/nodes/send_message.yaml from __future__ import annotations +import logging from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + @workflow_node('send_message') class SendMessageNode(WorkflowNode): """Send message node - send message to a target""" @@ -17,4 +20,52 @@ class SendMessageNode(WorkflowNode): category = 'action' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - return {'status': 'sent', 'message_id': f'msg_{context.execution_id}'} + # Get message content from inputs + message = inputs.get('message') or inputs.get('content') or inputs.get('input') or '' + message = str(message) if message is not None else '' + + # Get target configuration + target_type = self.get_config('target_type', 'person') + target_id = self.get_config('target_id', '') + + # If no target_id configured, use session_id from context + if not target_id: + target_id = f'{context.session_id or "unknown"}' + + if not message.strip(): + logger.warning('SendMessageNode has empty message') + return { + 'status': 'failed', + 'error': 'Empty message', + 'message_preview': '', + } + + # Send message if application instance is available + send_success = False + send_error = None + if self.ap: + try: + from langbot_plugin.api.entities.builtin.platform.message import MessageChain, Plain + + message_chain = MessageChain([Plain(text=message)]) + await self.ap.platform_mgr.websocket_proxy_bot.adapter.send_message( + target_type=target_type, + target_id=target_id, + message=message_chain, + ) + send_success = True + logger.info('SendMessageNode sent message to %s:%s', target_type, target_id) + except Exception as e: + send_error = str(e) + logger.error('SendMessageNode send failed: %s', e, exc_info=True) + else: + send_error = 'Missing application instance' + logger.warning('SendMessageNode missing application instance') + + return { + 'status': 'sent' if send_success else 'failed', + 'message_preview': message[:200], + 'target_type': target_type, + 'target_id': target_id, + 'error': send_error, + } diff --git a/src/langbot/pkg/workflow/nodes/wait.py b/src/langbot/pkg/workflow/nodes/wait.py index 465add98..57b9bc3d 100644 --- a/src/langbot/pkg/workflow/nodes/wait.py +++ b/src/langbot/pkg/workflow/nodes/wait.py @@ -5,11 +5,18 @@ Node metadata is loaded from: ../../templates/metadata/nodes/wait.yaml from __future__ import annotations +import logging from typing import Any from langbot_plugin.api.entities.builtin.workflow import ExecutionContext from ..node import WorkflowNode, workflow_node +logger = logging.getLogger(__name__) + +# 最大等待时间(秒) +_MAX_WAIT_SECONDS = 300 # 5 分钟 + + @workflow_node('wait') class WaitNode(WorkflowNode): """Wait node - pause execution for a duration""" @@ -22,11 +29,22 @@ class WaitNode(WorkflowNode): duration = self.get_config('duration', 1) duration_type = self.get_config('duration_type', 'seconds') + # 转换为秒 if duration_type == 'minutes': duration *= 60 elif duration_type == 'hours': duration *= 3600 + # 限制最大等待时间 + if duration > _MAX_WAIT_SECONDS: + logger.warning('Wait duration %s exceeds maximum %s, capping to %s', + duration, _MAX_WAIT_SECONDS, _MAX_WAIT_SECONDS) + duration = _MAX_WAIT_SECONDS + + # 确保 duration 为正数 + duration = max(0, duration) + + logger.info('Waiting for %.2f seconds', duration) await asyncio.sleep(duration) - return {'output': inputs.get('input')} + return {'output': inputs.get('input'), 'waited_seconds': duration} diff --git a/src/langbot/pkg/workflow/nodes/webhook_trigger.py b/src/langbot/pkg/workflow/nodes/webhook_trigger.py index 64c04ef3..e7b6eca6 100644 --- a/src/langbot/pkg/workflow/nodes/webhook_trigger.py +++ b/src/langbot/pkg/workflow/nodes/webhook_trigger.py @@ -17,11 +17,17 @@ class WebhookTriggerNode(WorkflowNode): category = 'trigger' async def execute(self, inputs: dict[str, Any], context: ExecutionContext) -> dict[str, Any]: - trigger_data = context.trigger_data + # Safe access to trigger_data which may be None + trigger_data = context.trigger_data or {} + + # Filter sensitive headers (Authorization, Cookie, etc.) + headers = trigger_data.get('headers', {}) + safe_headers = {k: v for k, v in headers.items() + if k.lower() not in ('authorization', 'cookie', 'x-api-key', 'x-secret')} return { 'body': trigger_data.get('body', {}), - 'headers': trigger_data.get('headers', {}), + 'headers': safe_headers, 'query': trigger_data.get('query', {}), 'method': trigger_data.get('method', 'POST'), }