From f17b06767e23f19d87780699dbd78c2dd9724f78 Mon Sep 17 00:00:00 2001 From: whw174660897 <174660897@qq.com> Date: Fri, 30 May 2025 22:23:57 +0800 Subject: [PATCH] Feature add n8 n (#1468) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(n8n): 添加n8n工作流API支持 添加n8n工作流API作为新的运行器类型,支持通过webhook调用n8n工作流,并提供多种认证方式(Basic、JWT、Header)。新增N8nAuthFormComponent用于处理n8n认证表单联动,并更新相关配置文件和测试用例。 * chore: remove pip mirror url * perf: simplify ret def of pipeline metadata * feat(n8n): raise exc instead of ret as normal msg * perf: add var `user_message_text` * chore(n8n): migration and default config * chore: required database version --------- Co-authored-by: hengwei.wang <@> Co-authored-by: Junyan Qin --- .../dbm002_combine_quote_msg_config.py | 7 +- .../migrations/dbm003_n8n_config.py | 49 +++++ pkg/provider/runners/n8nsvapi.py | 159 ++++++++++++++ pkg/utils/constants.py | 2 +- pyproject.toml | 1 + templates/default-pipeline-config.json | 12 ++ templates/metadata/pipeline/ai.yaml | 128 +++++++++++ .../dynamic-form/N8nAuthFormComponent.tsx | 200 ++++++++++++++++++ .../pipeline-form/PipelineFormComponent.tsx | 32 +++ web/src/app/infra/entities/api/index.ts | 75 +------ 10 files changed, 592 insertions(+), 73 deletions(-) create mode 100644 pkg/persistence/migrations/dbm003_n8n_config.py create mode 100644 pkg/provider/runners/n8nsvapi.py create mode 100644 web/src/app/home/components/dynamic-form/N8nAuthFormComponent.tsx diff --git a/pkg/persistence/migrations/dbm002_combine_quote_msg_config.py b/pkg/persistence/migrations/dbm002_combine_quote_msg_config.py index 2bb665ea..cebf403b 100644 --- a/pkg/persistence/migrations/dbm002_combine_quote_msg_config.py +++ b/pkg/persistence/migrations/dbm002_combine_quote_msg_config.py @@ -28,7 +28,12 @@ class DBMigrateCombineQuoteMsgConfig(migration.DBMigration): await self.ap.persistence_mgr.execute_async( sqlalchemy.update(persistence_pipeline.LegacyPipeline) .where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid']) - .values({'config': config, 'for_version': self.ap.ver_mgr.get_current_version()}) + .values( + { + 'config': config, + 'for_version': self.ap.ver_mgr.get_current_version(), + } + ) ) async def downgrade(self): diff --git a/pkg/persistence/migrations/dbm003_n8n_config.py b/pkg/persistence/migrations/dbm003_n8n_config.py new file mode 100644 index 00000000..8705040b --- /dev/null +++ b/pkg/persistence/migrations/dbm003_n8n_config.py @@ -0,0 +1,49 @@ +from .. import migration + +import sqlalchemy + +from ...entity.persistence import pipeline as persistence_pipeline + + +@migration.migration_class(3) +class DBMigrateN8nConfig(migration.DBMigration): + """N8n配置""" + + async def upgrade(self): + """升级""" + # read all pipelines + pipelines = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_pipeline.LegacyPipeline)) + + for pipeline in pipelines: + serialized_pipeline = self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline) + + config = serialized_pipeline['config'] + + if 'n8n-service-api' not in config['ai']: + config['ai']['n8n-service-api'] = { + 'webhook-url': 'http://your-n8n-webhook-url', + 'auth-type': 'none', + 'basic-username': '', + 'basic-password': '', + 'jwt-secret': '', + 'jwt-algorithm': 'HS256', + 'header-name': '', + 'header-value': '', + 'timeout': 120, + 'output-key': 'response', + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(persistence_pipeline.LegacyPipeline) + .where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid']) + .values( + { + 'config': config, + 'for_version': self.ap.ver_mgr.get_current_version(), + } + ) + ) + + async def downgrade(self): + """降级""" + pass diff --git a/pkg/provider/runners/n8nsvapi.py b/pkg/provider/runners/n8nsvapi.py new file mode 100644 index 00000000..7044cce1 --- /dev/null +++ b/pkg/provider/runners/n8nsvapi.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import typing +import json +import uuid +import aiohttp + +from .. import runner +from ...core import app, entities as core_entities +from .. import entities as llm_entities + + +class N8nAPIError(Exception): + """N8n API 请求失败""" + + def __init__(self, message: str): + self.message = message + super().__init__(self.message) + + +@runner.runner_class('n8n-service-api') +class N8nServiceAPIRunner(runner.RequestRunner): + """N8n Service API 工作流请求器""" + + def __init__(self, ap: app.Application, pipeline_config: dict): + self.ap = ap + self.pipeline_config = pipeline_config + + # 获取webhook URL + self.webhook_url = self.pipeline_config['ai']['n8n-service-api']['webhook-url'] + + # 获取超时设置,默认为120秒 + self.timeout = self.pipeline_config['ai']['n8n-service-api'].get('timeout', 120) + + # 获取输出键名,默认为response + self.output_key = self.pipeline_config['ai']['n8n-service-api'].get('output-key', 'response') + + # 获取认证类型,默认为none + self.auth_type = self.pipeline_config['ai']['n8n-service-api'].get('auth-type', 'none') + + # 根据认证类型获取相应的认证信息 + if self.auth_type == 'basic': + self.basic_username = self.pipeline_config['ai']['n8n-service-api'].get('basic-username', '') + self.basic_password = self.pipeline_config['ai']['n8n-service-api'].get('basic-password', '') + elif self.auth_type == 'jwt': + self.jwt_secret = self.pipeline_config['ai']['n8n-service-api'].get('jwt-secret', '') + self.jwt_algorithm = self.pipeline_config['ai']['n8n-service-api'].get('jwt-algorithm', 'HS256') + elif self.auth_type == 'header': + self.header_name = self.pipeline_config['ai']['n8n-service-api'].get('header-name', '') + self.header_value = self.pipeline_config['ai']['n8n-service-api'].get('header-value', '') + + async def _preprocess_user_message(self, query: core_entities.Query) -> str: + """预处理用户消息,提取纯文本 + + Returns: + str: 纯文本消息 + """ + plain_text = '' + + if isinstance(query.user_message.content, list): + for ce in query.user_message.content: + if ce.type == 'text': + plain_text += ce.text + # 注意:n8n webhook目前不支持直接处理图片,如需支持可在此扩展 + elif isinstance(query.user_message.content, str): + plain_text = query.user_message.content + + return plain_text + + async def _call_webhook(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + """调用n8n webhook""" + # 生成会话ID(如果不存在) + if not query.session.using_conversation.uuid: + query.session.using_conversation.uuid = str(uuid.uuid4()) + + # 预处理用户消息 + plain_text = await self._preprocess_user_message(query) + + # 准备请求数据 + payload = { + # 基本消息内容 + 'message': plain_text, + 'user_message_text': plain_text, + 'conversation_id': query.session.using_conversation.uuid, + 'session_id': query.variables.get('session_id', ''), + 'user_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + 'msg_create_time': query.variables.get('msg_create_time', ''), + } + + # 添加所有变量到payload + payload.update(query.variables) + + try: + # 准备请求头和认证信息 + headers = {} + auth = None + + # 根据认证类型设置相应的认证信息 + if self.auth_type == 'basic': + # 使用Basic认证 + auth = aiohttp.BasicAuth(self.basic_username, self.basic_password) + self.ap.logger.debug(f'using basic auth: {self.basic_username}') + elif self.auth_type == 'jwt': + # 使用JWT认证 + import jwt + import time + + # 创建JWT令牌 + payload_jwt = { + 'exp': int(time.time()) + 3600, # 1小时过期 + 'iat': int(time.time()), + 'sub': 'n8n-webhook', + } + token = jwt.encode(payload_jwt, self.jwt_secret, algorithm=self.jwt_algorithm) + + # 添加到Authorization头 + headers['Authorization'] = f'Bearer {token}' + self.ap.logger.debug('using jwt auth') + elif self.auth_type == 'header': + # 使用自定义请求头认证 + headers[self.header_name] = self.header_value + self.ap.logger.debug(f'using header auth: {self.header_name}') + else: + self.ap.logger.debug('no auth') + + # 调用webhook + async with aiohttp.ClientSession() as session: + async with session.post( + self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout + ) as response: + if response.status != 200: + error_text = await response.text() + self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') + raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') + + # 解析响应 + response_data = await response.json() + self.ap.logger.debug(f'n8n webhook response: {response_data}') + + # 从响应中提取输出 + if self.output_key in response_data: + output_content = response_data[self.output_key] + else: + # 如果没有指定的输出键,则使用整个响应 + output_content = json.dumps(response_data, ensure_ascii=False) + + # 返回消息 + yield llm_entities.Message( + role='assistant', + content=output_content, + ) + except Exception as e: + self.ap.logger.error(f'n8n webhook call exception: {str(e)}') + raise N8nAPIError(f'n8n webhook call exception: {str(e)}') + + async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + """运行请求""" + async for msg in self._call_webhook(query): + yield msg diff --git a/pkg/utils/constants.py b/pkg/utils/constants.py index 88aad5b9..b11cc60a 100644 --- a/pkg/utils/constants.py +++ b/pkg/utils/constants.py @@ -1,6 +1,6 @@ semantic_version = 'v4.0.4' -required_database_version = 2 +required_database_version = 3 """标记本版本所需要的数据库结构版本,用于判断数据库迁移""" debug_mode = False diff --git a/pyproject.toml b/pyproject.toml index a802f330..998dc891 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -181,3 +181,4 @@ skip-magic-trailing-comma = false # Like Black, automatically detect the appropriate line ending. line-ending = "auto" + diff --git a/templates/default-pipeline-config.json b/templates/default-pipeline-config.json index 3b718726..796c6356 100644 --- a/templates/default-pipeline-config.json +++ b/templates/default-pipeline-config.json @@ -58,6 +58,18 @@ "api-key": "your-api-key", "app-id": "your-app-id", "references-quote": "参考资料来自:" + }, + "n8n-service-api": { + "webhook-url": "http://your-n8n-webhook-url", + "auth-type": "none", + "basic-username": "", + "basic-password": "", + "jwt-secret": "", + "jwt-algorithm": "HS256", + "header-name": "", + "header-value": "", + "timeout": 120, + "output-key": "response" } }, "output": { diff --git a/templates/metadata/pipeline/ai.yaml b/templates/metadata/pipeline/ai.yaml index 4e5585bb..90732dc8 100644 --- a/templates/metadata/pipeline/ai.yaml +++ b/templates/metadata/pipeline/ai.yaml @@ -31,6 +31,10 @@ stages: label: en_US: Aliyun Dashscope App API zh_Hans: 阿里云百炼平台 API + - name: n8n-service-api + label: + en_US: n8n Workflow API + zh_Hans: n8n 工作流 API - name: local-agent label: en_US: Local Agent @@ -170,3 +174,127 @@ stages: type: string required: false default: '参考资料来自:' + - name: n8n-service-api + label: + en_US: n8n Workflow API + zh_Hans: n8n 工作流 API + description: + en_US: Configure the n8n workflow API of the pipeline + zh_Hans: 配置 n8n 工作流 API + config: + - name: webhook-url + label: + en_US: Webhook URL + zh_Hans: Webhook URL + description: + en_US: The webhook URL of the n8n workflow + zh_Hans: n8n 工作流的 webhook URL + type: string + required: true + - name: auth-type + label: + en_US: Authentication Type + zh_Hans: 认证类型 + description: + en_US: The authentication type for the webhook call + zh_Hans: webhook 调用的认证类型 + type: select + required: true + default: 'none' + options: + - name: 'none' + label: + en_US: None + zh_Hans: 无认证 + - name: 'basic' + label: + en_US: Basic Auth + zh_Hans: 基本认证 + - name: 'jwt' + label: + en_US: JWT + zh_Hans: JWT认证 + - name: 'header' + label: + en_US: Header Auth + zh_Hans: 请求头认证 + - name: basic-username + label: + en_US: Username + zh_Hans: 用户名 + description: + en_US: The username for Basic Auth + zh_Hans: 基本认证的用户名 + type: string + required: false + default: '' + - name: basic-password + label: + en_US: Password + zh_Hans: 密码 + description: + en_US: The password for Basic Auth + zh_Hans: 基本认证的密码 + type: string + required: false + default: '' + - name: jwt-secret + label: + en_US: Secret + zh_Hans: 密钥 + description: + en_US: The secret for JWT authentication + zh_Hans: JWT认证的密钥 + type: string + required: false + default: '' + - name: jwt-algorithm + label: + en_US: Algorithm + zh_Hans: 算法 + description: + en_US: The algorithm for JWT authentication + zh_Hans: JWT认证的算法 + type: string + required: false + default: 'HS256' + - name: header-name + label: + en_US: Header Name + zh_Hans: 请求头名称 + description: + en_US: The header name for Header Auth + zh_Hans: 请求头认证的名称 + type: string + required: false + default: '' + - name: header-value + label: + en_US: Header Value + zh_Hans: 请求头值 + description: + en_US: The header value for Header Auth + zh_Hans: 请求头认证的值 + type: string + required: false + default: '' + - name: timeout + label: + en_US: Timeout + zh_Hans: 超时时间 + description: + en_US: The timeout in seconds for the webhook call + zh_Hans: webhook 调用的超时时间(秒) + type: integer + required: false + default: 120 + - name: output-key + label: + en_US: Output Key + zh_Hans: 输出键名 + description: + en_US: The key name of the output in the webhook response + zh_Hans: webhook 响应中输出内容的键名 + type: string + required: false + default: 'response' diff --git a/web/src/app/home/components/dynamic-form/N8nAuthFormComponent.tsx b/web/src/app/home/components/dynamic-form/N8nAuthFormComponent.tsx new file mode 100644 index 00000000..f595fc7a --- /dev/null +++ b/web/src/app/home/components/dynamic-form/N8nAuthFormComponent.tsx @@ -0,0 +1,200 @@ +import { useEffect, useState } from 'react'; +import { useForm } from 'react-hook-form'; +import { zodResolver } from '@hookform/resolvers/zod'; +import { z } from 'zod'; +import { + Form, + FormControl, + FormField, + FormItem, + FormLabel, + FormMessage, +} from '@/components/ui/form'; +import { IDynamicFormItemSchema } from '@/app/infra/entities/form/dynamic'; +import DynamicFormItemComponent from '@/app/home/components/dynamic-form/DynamicFormItemComponent'; +import { i18nObj } from '@/i18n/I18nProvider'; + +/** + * N8n认证表单组件 + * 根据选择的认证类型动态显示相应的表单项 + */ +export default function N8nAuthFormComponent({ + itemConfigList, + onSubmit, + initialValues, +}: { + itemConfigList: IDynamicFormItemSchema[]; + onSubmit?: (val: object) => unknown; + initialValues?: Record; +}) { + // 当前选择的认证类型 + const [authType, setAuthType] = useState( + initialValues?.['auth-type'] || 'none' + ); + + // 根据 itemConfigList 动态生成 zod schema + const formSchema = z.object( + itemConfigList.reduce( + (acc, item) => { + let fieldSchema; + switch (item.type) { + case 'integer': + fieldSchema = z.number(); + break; + case 'float': + fieldSchema = z.number(); + break; + case 'boolean': + fieldSchema = z.boolean(); + break; + case 'string': + fieldSchema = z.string(); + break; + case 'array[string]': + fieldSchema = z.array(z.string()); + break; + case 'select': + fieldSchema = z.string(); + break; + case 'llm-model-selector': + fieldSchema = z.string(); + break; + case 'prompt-editor': + fieldSchema = z.array( + z.object({ + content: z.string(), + role: z.string(), + }), + ); + break; + default: + fieldSchema = z.string(); + } + + if ( + item.required && + (fieldSchema instanceof z.ZodString || + fieldSchema instanceof z.ZodArray) + ) { + fieldSchema = fieldSchema.min(1, { message: '此字段为必填项' }); + } + + return { + ...acc, + [item.name]: fieldSchema, + }; + }, + {} as Record, + ), + ); + + type FormValues = z.infer; + + const form = useForm({ + resolver: zodResolver(formSchema), + defaultValues: itemConfigList.reduce((acc, item) => { + // 优先使用 initialValues,如果没有则使用默认值 + const value = initialValues?.[item.name] ?? item.default; + return { + ...acc, + [item.name]: value, + }; + }, {} as FormValues), + }); + + // 当 initialValues 变化时更新表单值 + useEffect(() => { + if (initialValues) { + // 合并默认值和初始值 + const mergedValues = itemConfigList.reduce( + (acc, item) => { + acc[item.name] = initialValues[item.name] ?? item.default; + return acc; + }, + {} as Record, + ); + + Object.entries(mergedValues).forEach(([key, value]) => { + form.setValue(key as keyof FormValues, value); + }); + + // 更新认证类型 + setAuthType(mergedValues['auth-type'] as string || 'none'); + } + }, [initialValues, form, itemConfigList]); + + // 监听表单值变化 + useEffect(() => { + const subscription = form.watch((value, { name }) => { + // 如果认证类型变化,更新状态 + if (name === 'auth-type') { + setAuthType(value['auth-type'] as string); + } + + // 获取完整的表单值,确保包含所有默认值 + const formValues = form.getValues(); + const finalValues = itemConfigList.reduce( + (acc, item) => { + acc[item.name] = formValues[item.name] ?? item.default; + return acc; + }, + {} as Record, + ); + + onSubmit?.(finalValues); + }); + return () => subscription.unsubscribe(); + }, [form, onSubmit, itemConfigList]); + + // 根据认证类型过滤表单项 + const filteredConfigList = itemConfigList.filter((config) => { + // 始终显示webhook-url、auth-type、timeout和output-key + if (['webhook-url', 'auth-type', 'timeout', 'output-key'].includes(config.name)) { + return true; + } + + // 根据认证类型显示相应的表单项 + if (authType === 'basic' && config.name.startsWith('basic-')) { + return true; + } + if (authType === 'jwt' && config.name.startsWith('jwt-')) { + return true; + } + if (authType === 'header' && config.name.startsWith('header-')) { + return true; + } + + return false; + }); + + return ( +
+
+ {filteredConfigList.map((config) => ( + ( + + + {i18nObj(config.label)}{' '} + {config.required && *} + + + + + {config.description && ( +

+ {i18nObj(config.description)} +

+ )} + +
+ )} + /> + ))} +
+
+ ); +} \ No newline at end of file diff --git a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx index fe450c0a..7497f64a 100644 --- a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx +++ b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx @@ -8,6 +8,7 @@ import { } from '@/app/infra/entities/pipeline'; import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs'; import DynamicFormComponent from '@/app/home/components/dynamic-form/DynamicFormComponent'; +import N8nAuthFormComponent from '@/app/home/components/dynamic-form/N8nAuthFormComponent'; import { Button } from '@/components/ui/button'; import { useForm } from 'react-hook-form'; import { zodResolver } from '@hookform/resolvers/zod'; @@ -244,6 +245,37 @@ export default function PipelineFormComponent({ if (stage.name !== currentRunner) { return null; } + + // 对于n8n-service-api配置,使用N8nAuthFormComponent处理表单联动 + if (stage.name === 'n8n-service-api') { + return ( +
+
{i18nObj(stage.label)}
+ {stage.description && ( +
+ {i18nObj(stage.description)} +
+ )} + )?.[stage.name] || + {} + } + onSubmit={(values) => { + const currentValues = + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (form.getValues(formName) as Record) || {}; + form.setValue(formName, { + ...currentValues, + [stage.name]: values, + }); + }} + /> +
+ ); + } } return ( diff --git a/web/src/app/infra/entities/api/index.ts b/web/src/app/infra/entities/api/index.ts index 1efdd130..eb746272 100644 --- a/web/src/app/infra/entities/api/index.ts +++ b/web/src/app/infra/entities/api/index.ts @@ -203,77 +203,10 @@ export interface MarketPluginResponse { } interface GetPipelineConfig { - ai: { - 'dashscope-app-api': { - 'api-key': string; - 'app-id': string; - 'app-type': 'agent' | 'workflow'; - 'references-quote'?: string; - }; - 'dify-service-api': { - 'api-key': string; - 'app-type': 'chat' | 'agent' | 'workflow'; - 'base-url': string; - 'thinking-convert': 'plain' | 'original' | 'remove'; - timeout?: number; - }; - 'local-agent': { - 'max-round': number; - model: string; - prompt: Array<{ - content: string; - role: string; - }>; - }; - runner: { - runner: 'local-agent' | 'dify-service-api' | 'dashscope-app-api'; - }; - }; - output: { - 'force-delay': { - max: number; - min: number; - }; - 'long-text-processing': { - 'font-path': string; - strategy: 'forward' | 'image'; - threshold: number; - }; - misc: { - 'at-sender': boolean; - 'hide-exception': boolean; - 'quote-origin': boolean; - 'track-function-calls': boolean; - }; - }; - safety: { - 'content-filter': { - 'check-sensitive-words': boolean; - scope: 'all' | 'income-msg' | 'output-msg'; - }; - 'rate-limit': { - limitation: number; - strategy: 'drop' | 'wait'; - 'window-length': number; - }; - }; - trigger: { - 'access-control': { - blacklist: string[]; - mode: 'blacklist' | 'whitelist'; - whitelist: string[]; - }; - 'group-respond-rules': { - at: boolean; - prefix: string[]; - random: number; - regexp: string[]; - }; - 'ignore-rules': { - prefix: string[]; - regexp: string[]; - }; - }; + ai: {}; + output: {}; + safety: {}; + trigger: {}; } interface GetPipeline {