Feature add n8 n (#1468)

* feat(n8n): 添加n8n工作流API支持

添加n8n工作流API作为新的运行器类型,支持通过webhook调用n8n工作流,并提供多种认证方式(Basic、JWT、Header)。新增N8nAuthFormComponent用于处理n8n认证表单联动,并更新相关配置文件和测试用例。

* chore: remove pip mirror url

* perf: simplify ret def of pipeline metadata

* feat(n8n): raise exc instead of ret as normal msg

* perf: add var `user_message_text`

* chore(n8n): migration and default config

* chore: required database version

---------

Co-authored-by: hengwei.wang <@>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
whw174660897
2025-05-30 22:23:57 +08:00
committed by GitHub
parent 70a29fc623
commit f17b06767e
10 changed files with 592 additions and 73 deletions

View File

@@ -28,7 +28,12 @@ class DBMigrateCombineQuoteMsgConfig(migration.DBMigration):
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)
.where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid'])
.values({'config': config, 'for_version': self.ap.ver_mgr.get_current_version()})
.values(
{
'config': config,
'for_version': self.ap.ver_mgr.get_current_version(),
}
)
)
async def downgrade(self):

View File

@@ -0,0 +1,49 @@
from .. import migration
import sqlalchemy
from ...entity.persistence import pipeline as persistence_pipeline
@migration.migration_class(3)
class DBMigrateN8nConfig(migration.DBMigration):
"""N8n配置"""
async def upgrade(self):
"""升级"""
# read all pipelines
pipelines = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_pipeline.LegacyPipeline))
for pipeline in pipelines:
serialized_pipeline = self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline)
config = serialized_pipeline['config']
if 'n8n-service-api' not in config['ai']:
config['ai']['n8n-service-api'] = {
'webhook-url': 'http://your-n8n-webhook-url',
'auth-type': 'none',
'basic-username': '',
'basic-password': '',
'jwt-secret': '',
'jwt-algorithm': 'HS256',
'header-name': '',
'header-value': '',
'timeout': 120,
'output-key': 'response',
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)
.where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid'])
.values(
{
'config': config,
'for_version': self.ap.ver_mgr.get_current_version(),
}
)
)
async def downgrade(self):
"""降级"""
pass

View File

@@ -0,0 +1,159 @@
from __future__ import annotations
import typing
import json
import uuid
import aiohttp
from .. import runner
from ...core import app, entities as core_entities
from .. import entities as llm_entities
class N8nAPIError(Exception):
"""N8n API 请求失败"""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
@runner.runner_class('n8n-service-api')
class N8nServiceAPIRunner(runner.RequestRunner):
"""N8n Service API 工作流请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
# 获取webhook URL
self.webhook_url = self.pipeline_config['ai']['n8n-service-api']['webhook-url']
# 获取超时设置默认为120秒
self.timeout = self.pipeline_config['ai']['n8n-service-api'].get('timeout', 120)
# 获取输出键名默认为response
self.output_key = self.pipeline_config['ai']['n8n-service-api'].get('output-key', 'response')
# 获取认证类型默认为none
self.auth_type = self.pipeline_config['ai']['n8n-service-api'].get('auth-type', 'none')
# 根据认证类型获取相应的认证信息
if self.auth_type == 'basic':
self.basic_username = self.pipeline_config['ai']['n8n-service-api'].get('basic-username', '')
self.basic_password = self.pipeline_config['ai']['n8n-service-api'].get('basic-password', '')
elif self.auth_type == 'jwt':
self.jwt_secret = self.pipeline_config['ai']['n8n-service-api'].get('jwt-secret', '')
self.jwt_algorithm = self.pipeline_config['ai']['n8n-service-api'].get('jwt-algorithm', 'HS256')
elif self.auth_type == 'header':
self.header_name = self.pipeline_config['ai']['n8n-service-api'].get('header-name', '')
self.header_value = self.pipeline_config['ai']['n8n-service-api'].get('header-value', '')
async def _preprocess_user_message(self, query: core_entities.Query) -> str:
"""预处理用户消息,提取纯文本
Returns:
str: 纯文本消息
"""
plain_text = ''
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
# 注意n8n webhook目前不支持直接处理图片如需支持可在此扩展
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
return plain_text
async def _call_webhook(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用n8n webhook"""
# 生成会话ID如果不存在
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
# 预处理用户消息
plain_text = await self._preprocess_user_message(query)
# 准备请求数据
payload = {
# 基本消息内容
'message': plain_text,
'user_message_text': plain_text,
'conversation_id': query.session.using_conversation.uuid,
'session_id': query.variables.get('session_id', ''),
'user_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'msg_create_time': query.variables.get('msg_create_time', ''),
}
# 添加所有变量到payload
payload.update(query.variables)
try:
# 准备请求头和认证信息
headers = {}
auth = None
# 根据认证类型设置相应的认证信息
if self.auth_type == 'basic':
# 使用Basic认证
auth = aiohttp.BasicAuth(self.basic_username, self.basic_password)
self.ap.logger.debug(f'using basic auth: {self.basic_username}')
elif self.auth_type == 'jwt':
# 使用JWT认证
import jwt
import time
# 创建JWT令牌
payload_jwt = {
'exp': int(time.time()) + 3600, # 1小时过期
'iat': int(time.time()),
'sub': 'n8n-webhook',
}
token = jwt.encode(payload_jwt, self.jwt_secret, algorithm=self.jwt_algorithm)
# 添加到Authorization头
headers['Authorization'] = f'Bearer {token}'
self.ap.logger.debug('using jwt auth')
elif self.auth_type == 'header':
# 使用自定义请求头认证
headers[self.header_name] = self.header_value
self.ap.logger.debug(f'using header auth: {self.header_name}')
else:
self.ap.logger.debug('no auth')
# 调用webhook
async with aiohttp.ClientSession() as session:
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
# 解析响应
response_data = await response.json()
self.ap.logger.debug(f'n8n webhook response: {response_data}')
# 从响应中提取输出
if self.output_key in response_data:
output_content = response_data[self.output_key]
else:
# 如果没有指定的输出键,则使用整个响应
output_content = json.dumps(response_data, ensure_ascii=False)
# 返回消息
yield llm_entities.Message(
role='assistant',
content=output_content,
)
except Exception as e:
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""运行请求"""
async for msg in self._call_webhook(query):
yield msg

View File

@@ -1,6 +1,6 @@
semantic_version = 'v4.0.4'
required_database_version = 2
required_database_version = 3
"""标记本版本所需要的数据库结构版本,用于判断数据库迁移"""
debug_mode = False

View File

@@ -181,3 +181,4 @@ skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"

View File

@@ -58,6 +58,18 @@
"api-key": "your-api-key",
"app-id": "your-app-id",
"references-quote": "参考资料来自:"
},
"n8n-service-api": {
"webhook-url": "http://your-n8n-webhook-url",
"auth-type": "none",
"basic-username": "",
"basic-password": "",
"jwt-secret": "",
"jwt-algorithm": "HS256",
"header-name": "",
"header-value": "",
"timeout": 120,
"output-key": "response"
}
},
"output": {

View File

@@ -31,6 +31,10 @@ stages:
label:
en_US: Aliyun Dashscope App API
zh_Hans: 阿里云百炼平台 API
- name: n8n-service-api
label:
en_US: n8n Workflow API
zh_Hans: n8n 工作流 API
- name: local-agent
label:
en_US: Local Agent
@@ -170,3 +174,127 @@ stages:
type: string
required: false
default: '参考资料来自:'
- name: n8n-service-api
label:
en_US: n8n Workflow API
zh_Hans: n8n 工作流 API
description:
en_US: Configure the n8n workflow API of the pipeline
zh_Hans: 配置 n8n 工作流 API
config:
- name: webhook-url
label:
en_US: Webhook URL
zh_Hans: Webhook URL
description:
en_US: The webhook URL of the n8n workflow
zh_Hans: n8n 工作流的 webhook URL
type: string
required: true
- name: auth-type
label:
en_US: Authentication Type
zh_Hans: 认证类型
description:
en_US: The authentication type for the webhook call
zh_Hans: webhook 调用的认证类型
type: select
required: true
default: 'none'
options:
- name: 'none'
label:
en_US: None
zh_Hans: 无认证
- name: 'basic'
label:
en_US: Basic Auth
zh_Hans: 基本认证
- name: 'jwt'
label:
en_US: JWT
zh_Hans: JWT认证
- name: 'header'
label:
en_US: Header Auth
zh_Hans: 请求头认证
- name: basic-username
label:
en_US: Username
zh_Hans: 用户名
description:
en_US: The username for Basic Auth
zh_Hans: 基本认证的用户名
type: string
required: false
default: ''
- name: basic-password
label:
en_US: Password
zh_Hans: 密码
description:
en_US: The password for Basic Auth
zh_Hans: 基本认证的密码
type: string
required: false
default: ''
- name: jwt-secret
label:
en_US: Secret
zh_Hans: 密钥
description:
en_US: The secret for JWT authentication
zh_Hans: JWT认证的密钥
type: string
required: false
default: ''
- name: jwt-algorithm
label:
en_US: Algorithm
zh_Hans: 算法
description:
en_US: The algorithm for JWT authentication
zh_Hans: JWT认证的算法
type: string
required: false
default: 'HS256'
- name: header-name
label:
en_US: Header Name
zh_Hans: 请求头名称
description:
en_US: The header name for Header Auth
zh_Hans: 请求头认证的名称
type: string
required: false
default: ''
- name: header-value
label:
en_US: Header Value
zh_Hans: 请求头值
description:
en_US: The header value for Header Auth
zh_Hans: 请求头认证的值
type: string
required: false
default: ''
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
description:
en_US: The timeout in seconds for the webhook call
zh_Hans: webhook 调用的超时时间(秒)
type: integer
required: false
default: 120
- name: output-key
label:
en_US: Output Key
zh_Hans: 输出键名
description:
en_US: The key name of the output in the webhook response
zh_Hans: webhook 响应中输出内容的键名
type: string
required: false
default: 'response'

View File

@@ -0,0 +1,200 @@
import { useEffect, useState } from 'react';
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
import { z } from 'zod';
import {
Form,
FormControl,
FormField,
FormItem,
FormLabel,
FormMessage,
} from '@/components/ui/form';
import { IDynamicFormItemSchema } from '@/app/infra/entities/form/dynamic';
import DynamicFormItemComponent from '@/app/home/components/dynamic-form/DynamicFormItemComponent';
import { i18nObj } from '@/i18n/I18nProvider';
/**
* N8n认证表单组件
* 根据选择的认证类型动态显示相应的表单项
*/
export default function N8nAuthFormComponent({
itemConfigList,
onSubmit,
initialValues,
}: {
itemConfigList: IDynamicFormItemSchema[];
onSubmit?: (val: object) => unknown;
initialValues?: Record<string, any>;
}) {
// 当前选择的认证类型
const [authType, setAuthType] = useState<string>(
initialValues?.['auth-type'] || 'none'
);
// 根据 itemConfigList 动态生成 zod schema
const formSchema = z.object(
itemConfigList.reduce(
(acc, item) => {
let fieldSchema;
switch (item.type) {
case 'integer':
fieldSchema = z.number();
break;
case 'float':
fieldSchema = z.number();
break;
case 'boolean':
fieldSchema = z.boolean();
break;
case 'string':
fieldSchema = z.string();
break;
case 'array[string]':
fieldSchema = z.array(z.string());
break;
case 'select':
fieldSchema = z.string();
break;
case 'llm-model-selector':
fieldSchema = z.string();
break;
case 'prompt-editor':
fieldSchema = z.array(
z.object({
content: z.string(),
role: z.string(),
}),
);
break;
default:
fieldSchema = z.string();
}
if (
item.required &&
(fieldSchema instanceof z.ZodString ||
fieldSchema instanceof z.ZodArray)
) {
fieldSchema = fieldSchema.min(1, { message: '此字段为必填项' });
}
return {
...acc,
[item.name]: fieldSchema,
};
},
{} as Record<string, z.ZodTypeAny>,
),
);
type FormValues = z.infer<typeof formSchema>;
const form = useForm<FormValues>({
resolver: zodResolver(formSchema),
defaultValues: itemConfigList.reduce((acc, item) => {
// 优先使用 initialValues如果没有则使用默认值
const value = initialValues?.[item.name] ?? item.default;
return {
...acc,
[item.name]: value,
};
}, {} as FormValues),
});
// 当 initialValues 变化时更新表单值
useEffect(() => {
if (initialValues) {
// 合并默认值和初始值
const mergedValues = itemConfigList.reduce(
(acc, item) => {
acc[item.name] = initialValues[item.name] ?? item.default;
return acc;
},
{} as Record<string, any>,
);
Object.entries(mergedValues).forEach(([key, value]) => {
form.setValue(key as keyof FormValues, value);
});
// 更新认证类型
setAuthType(mergedValues['auth-type'] as string || 'none');
}
}, [initialValues, form, itemConfigList]);
// 监听表单值变化
useEffect(() => {
const subscription = form.watch((value, { name }) => {
// 如果认证类型变化,更新状态
if (name === 'auth-type') {
setAuthType(value['auth-type'] as string);
}
// 获取完整的表单值,确保包含所有默认值
const formValues = form.getValues();
const finalValues = itemConfigList.reduce(
(acc, item) => {
acc[item.name] = formValues[item.name] ?? item.default;
return acc;
},
{} as Record<string, any>,
);
onSubmit?.(finalValues);
});
return () => subscription.unsubscribe();
}, [form, onSubmit, itemConfigList]);
// 根据认证类型过滤表单项
const filteredConfigList = itemConfigList.filter((config) => {
// 始终显示webhook-url、auth-type、timeout和output-key
if (['webhook-url', 'auth-type', 'timeout', 'output-key'].includes(config.name)) {
return true;
}
// 根据认证类型显示相应的表单项
if (authType === 'basic' && config.name.startsWith('basic-')) {
return true;
}
if (authType === 'jwt' && config.name.startsWith('jwt-')) {
return true;
}
if (authType === 'header' && config.name.startsWith('header-')) {
return true;
}
return false;
});
return (
<Form {...form}>
<div className="space-y-4">
{filteredConfigList.map((config) => (
<FormField
key={config.id}
control={form.control}
name={config.name as keyof FormValues}
render={({ field }) => (
<FormItem>
<FormLabel>
{i18nObj(config.label)}{' '}
{config.required && <span className="text-red-500">*</span>}
</FormLabel>
<FormControl>
<DynamicFormItemComponent config={config} field={field} />
</FormControl>
{config.description && (
<p className="text-sm text-muted-foreground">
{i18nObj(config.description)}
</p>
)}
<FormMessage />
</FormItem>
)}
/>
))}
</div>
</Form>
);
}

View File

@@ -8,6 +8,7 @@ import {
} from '@/app/infra/entities/pipeline';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import DynamicFormComponent from '@/app/home/components/dynamic-form/DynamicFormComponent';
import N8nAuthFormComponent from '@/app/home/components/dynamic-form/N8nAuthFormComponent';
import { Button } from '@/components/ui/button';
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
@@ -244,6 +245,37 @@ export default function PipelineFormComponent({
if (stage.name !== currentRunner) {
return null;
}
// 对于n8n-service-api配置使用N8nAuthFormComponent处理表单联动
if (stage.name === 'n8n-service-api') {
return (
<div key={stage.name} className="space-y-4 mb-6">
<div className="text-lg font-medium">{i18nObj(stage.label)}</div>
{stage.description && (
<div className="text-sm text-gray-500">
{i18nObj(stage.description)}
</div>
)}
<N8nAuthFormComponent
itemConfigList={stage.config}
initialValues={
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(form.watch(formName) as Record<string, any>)?.[stage.name] ||
{}
}
onSubmit={(values) => {
const currentValues =
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(form.getValues(formName) as Record<string, any>) || {};
form.setValue(formName, {
...currentValues,
[stage.name]: values,
});
}}
/>
</div>
);
}
}
return (

View File

@@ -203,77 +203,10 @@ export interface MarketPluginResponse {
}
interface GetPipelineConfig {
ai: {
'dashscope-app-api': {
'api-key': string;
'app-id': string;
'app-type': 'agent' | 'workflow';
'references-quote'?: string;
};
'dify-service-api': {
'api-key': string;
'app-type': 'chat' | 'agent' | 'workflow';
'base-url': string;
'thinking-convert': 'plain' | 'original' | 'remove';
timeout?: number;
};
'local-agent': {
'max-round': number;
model: string;
prompt: Array<{
content: string;
role: string;
}>;
};
runner: {
runner: 'local-agent' | 'dify-service-api' | 'dashscope-app-api';
};
};
output: {
'force-delay': {
max: number;
min: number;
};
'long-text-processing': {
'font-path': string;
strategy: 'forward' | 'image';
threshold: number;
};
misc: {
'at-sender': boolean;
'hide-exception': boolean;
'quote-origin': boolean;
'track-function-calls': boolean;
};
};
safety: {
'content-filter': {
'check-sensitive-words': boolean;
scope: 'all' | 'income-msg' | 'output-msg';
};
'rate-limit': {
limitation: number;
strategy: 'drop' | 'wait';
'window-length': number;
};
};
trigger: {
'access-control': {
blacklist: string[];
mode: 'blacklist' | 'whitelist';
whitelist: string[];
};
'group-respond-rules': {
at: boolean;
prefix: string[];
random: number;
regexp: string[];
};
'ignore-rules': {
prefix: string[];
regexp: string[];
};
};
ai: {};
output: {};
safety: {};
trigger: {};
}
interface GetPipeline {