n8n 自定义节点开发 - 扩展平台功能
虽然 n8n 提供了丰富的内置节点,但在实际业务中,我们可能需要开发自定义节点来满足特定需求。今天我们来学习如何开发自定义 n8n 节点。
自定义节点基础
节点结构
一个 n8n 节点主要包含以下文件:
MyCustomNode/
├── MyCustomNode.node.ts # 节点主逻辑
├── MyCustomNode.node.json # 节点描述文件
├── MyCustomNodeIcon.svg # 节点图标
└── GenericFunctions.ts # 通用函数(可选)
节点描述文件
json
{
"node": "n8n-nodes-mycustom.MyCustomNode",
"nodeVersion": "1.0",
"codex": {
"categories": ["Custom"],
"subcategories": {
"Custom": ["Data Processing"]
},
"resources": {
"primaryDocumentation": [
{
"url": "https://docs.example.com/api"
}
]
}
}
}
开发环境搭建
项目初始化
bash
# 创建新的 n8n 节点项目
npm init n8n-node my-custom-node
# 进入项目目录
cd my-custom-node
# 安装依赖
npm install
# 开发模式
npm run dev
项目结构
my-custom-node/
├── package.json
├── tsconfig.json
├── nodes/
│ └── MyCustomNode/
│ ├── MyCustomNode.node.ts
│ └── MyCustomNode.node.json
├── credentials/
│ └── MyCustomApi.credentials.ts
└── dist/
节点开发实例
简单数据处理节点
typescript
// MyCustomNode.node.ts
import { IExecuteFunctions } from 'n8n-core';
import {
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodePropertyTypes,
} from 'n8n-workflow';
export class MyCustomNode implements INodeType {
description: INodeTypeDescription = {
displayName: 'My Custom Node',
name: 'myCustomNode',
icon: 'file:MyCustomNodeIcon.svg',
group: ['transform'],
version: 1,
subtitle: '={{$parameter["operation"]}}',
description: 'Custom data processing node',
defaults: {
name: 'My Custom Node',
},
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
options: [
{
name: 'Transform Data',
value: 'transform',
description: 'Transform input data',
action: 'Transform data',
},
{
name: 'Validate Data',
value: 'validate',
description: 'Validate input data',
action: 'Validate data',
},
{
name: 'Aggregate Data',
value: 'aggregate',
description: 'Aggregate input data',
action: 'Aggregate data',
},
],
default: 'transform',
},
{
displayName: 'Transform Rules',
name: 'transformRules',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
displayOptions: {
show: {
operation: ['transform'],
},
},
default: {},
options: [
{
name: 'rules',
displayName: 'Rule',
values: [
{
displayName: 'Field Name',
name: 'fieldName',
type: 'string',
default: '',
description: 'Name of the field to transform',
},
{
displayName: 'Transform Type',
name: 'transformType',
type: 'options',
options: [
{
name: 'Uppercase',
value: 'uppercase',
},
{
name: 'Lowercase',
value: 'lowercase',
},
{
name: 'Capitalize',
value: 'capitalize',
},
{
name: 'Trim',
value: 'trim',
},
],
default: 'uppercase',
},
],
},
],
},
{
displayName: 'Validation Schema',
name: 'validationSchema',
type: 'json',
displayOptions: {
show: {
operation: ['validate'],
},
},
default: '{\n "required": ["name", "email"],\n "properties": {\n "name": {"type": "string"},\n "email": {"type": "string", "format": "email"}\n }\n}',
description: 'JSON Schema for validation',
},
{
displayName: 'Aggregation Field',
name: 'aggregationField',
type: 'string',
displayOptions: {
show: {
operation: ['aggregate'],
},
},
default: '',
description: 'Field to aggregate by',
},
{
displayName: 'Aggregation Function',
name: 'aggregationFunction',
type: 'options',
displayOptions: {
show: {
operation: ['aggregate'],
},
},
options: [
{
name: 'Count',
value: 'count',
},
{
name: 'Sum',
value: 'sum',
},
{
name: 'Average',
value: 'average',
},
{
name: 'Min',
value: 'min',
},
{
name: 'Max',
value: 'max',
},
],
default: 'count',
},
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0) as string;
let returnData: INodeExecutionData[] = [];
switch (operation) {
case 'transform':
returnData = await this.transformData(items);
break;
case 'validate':
returnData = await this.validateData(items);
break;
case 'aggregate':
returnData = await this.aggregateData(items);
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
return [returnData];
}
private async transformData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
const transformRules = this.getNodeParameter('transformRules.rules', 0, []) as Array<{
fieldName: string;
transformType: string;
}>;
return items.map((item, index) => {
const newItem = { ...item };
transformRules.forEach(rule => {
const fieldValue = newItem.json[rule.fieldName];
if (typeof fieldValue === 'string') {
switch (rule.transformType) {
case 'uppercase':
newItem.json[rule.fieldName] = fieldValue.toUpperCase();
break;
case 'lowercase':
newItem.json[rule.fieldName] = fieldValue.toLowerCase();
break;
case 'capitalize':
newItem.json[rule.fieldName] = fieldValue.charAt(0).toUpperCase() + fieldValue.slice(1).toLowerCase();
break;
case 'trim':
newItem.json[rule.fieldName] = fieldValue.trim();
break;
}
}
});
return newItem;
});
}
private async validateData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
const validationSchema = JSON.parse(this.getNodeParameter('validationSchema', 0) as string);
const Ajv = require('ajv');
const addFormats = require('ajv-formats');
const ajv = new Ajv();
addFormats(ajv);
const validate = ajv.compile(validationSchema);
return items.map((item, index) => {
const isValid = validate(item.json);
return {
json: {
...item.json,
_validation: {
isValid,
errors: validate.errors || []
}
}
};
});
}
private async aggregateData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
const aggregationField = this.getNodeParameter('aggregationField', 0) as string;
const aggregationFunction = this.getNodeParameter('aggregationFunction', 0) as string;
const groups: { [key: string]: any[] } = {};
// 分组数据
items.forEach(item => {
const groupKey = item.json[aggregationField] || 'null';
if (!groups[groupKey]) {
groups[groupKey] = [];
}
groups[groupKey].push(item.json);
});
// 聚合计算
const results = Object.entries(groups).map(([groupKey, groupItems]) => {
let aggregatedValue: any;
switch (aggregationFunction) {
case 'count':
aggregatedValue = groupItems.length;
break;
case 'sum':
aggregatedValue = groupItems.reduce((sum, item) => {
const value = parseFloat(item.value) || 0;
return sum + value;
}, 0);
break;
case 'average':
const sum = groupItems.reduce((sum, item) => {
const value = parseFloat(item.value) || 0;
return sum + value;
}, 0);
aggregatedValue = sum / groupItems.length;
break;
case 'min':
aggregatedValue = Math.min(...groupItems.map(item => parseFloat(item.value) || 0));
break;
case 'max':
aggregatedValue = Math.max(...groupItems.map(item => parseFloat(item.value) || 0));
break;
default:
aggregatedValue = groupItems.length;
}
return {
json: {
[aggregationField]: groupKey,
aggregatedValue,
count: groupItems.length,
function: aggregationFunction
}
};
});
return results;
}
}
API 集成节点
typescript
// ApiIntegrationNode.node.ts
import { IExecuteFunctions } from 'n8n-core';
import {
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodeOperationError,
} from 'n8n-workflow';
export class ApiIntegrationNode implements INodeType {
description: INodeTypeDescription = {
displayName: 'API Integration',
name: 'apiIntegration',
icon: 'file:ApiIntegrationIcon.svg',
group: ['input'],
version: 1,
subtitle: '={{$parameter["operation"] + ": " + $parameter["resource"]}}',
description: 'Interact with custom API',
defaults: {
name: 'API Integration',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'customApi',
required: true,
testedBy: 'customApiTest',
},
],
properties: [
{
displayName: 'Resource',
name: 'resource',
type: 'options',
noDataExpression: true,
options: [
{
name: 'User',
value: 'user',
},
{
name: 'Order',
value: 'order',
},
{
name: 'Product',
value: 'product',
},
],
default: 'user',
},
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
displayOptions: {
show: {
resource: ['user'],
},
},
options: [
{
name: 'Create',
value: 'create',
action: 'Create a user',
},
{
name: 'Get',
value: 'get',
action: 'Get a user',
},
{
name: 'Update',
value: 'update',
action: 'Update a user',
},
{
name: 'Delete',
value: 'delete',
action: 'Delete a user',
},
{
name: 'List',
value: 'list',
action: 'List users',
},
],
default: 'get',
},
// 更多属性配置...
],
};
methods = {
credentialTest: {
async customApiTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data;
const options = {
method: 'GET',
headers: {
'Authorization': `Bearer ${credentials!.apiKey}`,
},
uri: `${credentials!.baseUrl}/api/test`,
json: true,
};
try {
await this.helpers.request(options);
} catch (error) {
return {
status: 'Error',
message: `Connection failed: ${error.message}`,
};
}
return {
status: 'OK',
message: 'Authentication successful!',
};
},
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
const resource = this.getNodeParameter('resource', 0) as string;
const operation = this.getNodeParameter('operation', 0) as string;
const credentials = await this.getCredentials('customApi');
for (let i = 0; i < items.length; i++) {
try {
let responseData;
if (resource === 'user') {
responseData = await this.handleUserOperations(operation, i, credentials);
} else if (resource === 'order') {
responseData = await this.handleOrderOperations(operation, i, credentials);
} else if (resource === 'product') {
responseData = await this.handleProductOperations(operation, i, credentials);
}
if (Array.isArray(responseData)) {
returnData.push(...responseData.map(item => ({ json: item })));
} else {
returnData.push({ json: responseData });
}
} catch (error) {
if (this.continueOnFail()) {
returnData.push({
json: {
error: error.message,
},
});
continue;
}
throw error;
}
}
return [returnData];
}
private async handleUserOperations(
operation: string,
itemIndex: number,
credentials: IDataObject,
): Promise<any> {
const baseUrl = credentials.baseUrl as string;
const apiKey = credentials.apiKey as string;
const baseOptions = {
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
json: true,
};
switch (operation) {
case 'create':
const createData = this.getNodeParameter('userData', itemIndex) as IDataObject;
return await this.helpers.request({
...baseOptions,
method: 'POST',
uri: `${baseUrl}/api/users`,
body: createData,
});
case 'get':
const userId = this.getNodeParameter('userId', itemIndex) as string;
return await this.helpers.request({
...baseOptions,
method: 'GET',
uri: `${baseUrl}/api/users/${userId}`,
});
case 'update':
const updateUserId = this.getNodeParameter('userId', itemIndex) as string;
const updateData = this.getNodeParameter('userData', itemIndex) as IDataObject;
return await this.helpers.request({
...baseOptions,
method: 'PUT',
uri: `${baseUrl}/api/users/${updateUserId}`,
body: updateData,
});
case 'delete':
const deleteUserId = this.getNodeParameter('userId', itemIndex) as string;
return await this.helpers.request({
...baseOptions,
method: 'DELETE',
uri: `${baseUrl}/api/users/${deleteUserId}`,
});
case 'list':
const limit = this.getNodeParameter('limit', itemIndex, 50) as number;
const offset = this.getNodeParameter('offset', itemIndex, 0) as number;
return await this.helpers.request({
...baseOptions,
method: 'GET',
uri: `${baseUrl}/api/users`,
qs: { limit, offset },
});
default:
throw new NodeOperationError(this.getNode(), `Unknown operation: ${operation}`);
}
}
private async handleOrderOperations(
operation: string,
itemIndex: number,
credentials: IDataObject,
): Promise<any> {
// 订单操作实现
return {};
}
private async handleProductOperations(
operation: string,
itemIndex: number,
credentials: IDataObject,
): Promise<any> {
// 产品操作实现
return {};
}
}
凭证开发
自定义 API 凭证
typescript
// CustomApi.credentials.ts
import {
IAuthenticateGeneric,
ICredentialTestRequest,
ICredentialType,
INodeProperties,
} from 'n8n-workflow';
export class CustomApi implements ICredentialType {
name = 'customApi';
displayName = 'Custom API';
documentationUrl = 'https://docs.example.com/api';
properties: INodeProperties[] = [
{
displayName: 'Base URL',
name: 'baseUrl',
type: 'string',
default: 'https://api.example.com',
placeholder: 'https://api.example.com',
description: 'The base URL of the API',
},
{
displayName: 'API Key',
name: 'apiKey',
type: 'string',
typeOptions: {
password: true,
},
default: '',
description: 'Your API key',
},
{
displayName: 'Environment',
name: 'environment',
type: 'options',
options: [
{
name: 'Production',
value: 'production',
},
{
name: 'Staging',
value: 'staging',
},
{
name: 'Development',
value: 'development',
},
],
default: 'production',
},
];
authenticate: IAuthenticateGeneric = {
type: 'generic',
properties: {
headers: {
Authorization: '=Bearer {{$credentials.apiKey}}',
},
},
};
test: ICredentialTestRequest = {
request: {
baseURL: '={{$credentials.baseUrl}}',
url: '/api/test',
method: 'GET',
},
};
}
节点测试
单元测试
typescript
// MyCustomNode.test.ts
import { IExecuteFunctions } from 'n8n-core';
import { INodeExecutionData } from 'n8n-workflow';
import { MyCustomNode } from '../MyCustomNode.node';
describe('MyCustomNode', () => {
let node: MyCustomNode;
let mockExecuteFunctions: jest.Mocked<IExecuteFunctions>;
beforeEach(() => {
node = new MyCustomNode();
mockExecuteFunctions = {
getInputData: jest.fn(),
getNodeParameter: jest.fn(),
continueOnFail: jest.fn(),
} as any;
});
describe('transform operation', () => {
it('should transform data correctly', async () => {
const inputData: INodeExecutionData[] = [
{
json: {
name: 'john doe',
email: 'JOHN@EXAMPLE.COM',
},
},
];
mockExecuteFunctions.getInputData.mockReturnValue(inputData);
mockExecuteFunctions.getNodeParameter
.mockReturnValueOnce('transform')
.mockReturnValueOnce([
{
fieldName: 'name',
transformType: 'capitalize',
},
{
fieldName: 'email',
transformType: 'lowercase',
},
]);
const result = await node.execute.call(mockExecuteFunctions);
expect(result[0]).toHaveLength(1);
expect(result[0][0].json).toEqual({
name: 'John doe',
email: 'john@example.com',
});
});
});
describe('validate operation', () => {
it('should validate data correctly', async () => {
const inputData: INodeExecutionData[] = [
{
json: {
name: 'John Doe',
email: 'john@example.com',
},
},
];
const validationSchema = {
required: ['name', 'email'],
properties: {
name: { type: 'string' },
email: { type: 'string', format: 'email' },
},
};
mockExecuteFunctions.getInputData.mockReturnValue(inputData);
mockExecuteFunctions.getNodeParameter
.mockReturnValueOnce('validate')
.mockReturnValueOnce(JSON.stringify(validationSchema));
const result = await node.execute.call(mockExecuteFunctions);
expect(result[0]).toHaveLength(1);
expect(result[0][0].json._validation.isValid).toBe(true);
});
});
});
发布和分发
包配置
json
{
"name": "n8n-nodes-mycustom",
"version": "1.0.0",
"description": "Custom nodes for n8n",
"keywords": ["n8n-community-node-package"],
"license": "MIT",
"homepage": "https://github.com/username/n8n-nodes-mycustom",
"author": {
"name": "Your Name",
"email": "your.email@example.com"
},
"repository": {
"type": "git",
"url": "git+https://github.com/username/n8n-nodes-mycustom.git"
},
"main": "index.js",
"scripts": {
"build": "tsc && gulp build:icons",
"dev": "tsc --watch",
"format": "prettier nodes credentials --write",
"lint": "eslint nodes credentials package.json",
"lintfix": "eslint nodes credentials package.json --fix",
"prepublishOnly": "npm run build && npm run lint -s"
},
"files": ["dist"],
"n8n": {
"n8nNodesApiVersion": 1,
"credentials": ["dist/credentials/CustomApi.credentials.js"],
"nodes": ["dist/nodes/MyCustomNode/MyCustomNode.node.js"]
},
"devDependencies": {
"@types/jest": "^29.5.5",
"@types/node": "^18.16.16",
"eslint": "^8.42.0",
"gulp": "^4.0.2",
"jest": "^29.6.2",
"n8n-workflow": "*",
"prettier": "^2.8.8",
"typescript": "^5.1.3"
},
"peerDependencies": {
"n8n-workflow": "*"
}
}
发布到 npm
bash
# 构建项目
npm run build
# 运行测试
npm test
# 发布到 npm
npm publish
小结
自定义节点开发让我们能够:
- 扩展功能:为特定业务需求创建专用节点
- 提高复用性:将常用逻辑封装成可重用的节点
- 改善用户体验:提供更直观的操作界面
- 集成专有系统:连接内部或第三方系统
- 社区贡献:分享节点给 n8n 社区
下一篇文章,我们将学习性能优化与监控,这对于生产环境的工作流至关重要。
记住,开发自定义节点时要注重代码质量、错误处理和用户体验。完善的文档和测试是成功节点的关键要素。