Skip to content

n8n 自定义节点开发 - 扩展平台功能

虽然 n8n 提供了丰富的内置节点,但在实际业务中,我们可能需要开发自定义节点来满足特定需求。今天我们来学习如何开发自定义 n8n 节点。

自定义节点基础

节点结构

一个 n8n 节点主要包含以下文件:

MyCustomNode/
├── MyCustomNode.node.ts          # 节点主逻辑
├── MyCustomNode.node.json        # 节点描述文件
├── MyCustomNodeIcon.svg          # 节点图标
└── GenericFunctions.ts           # 通用函数(可选)

节点描述文件

json
{
  "node": "n8n-nodes-mycustom.MyCustomNode",
  "nodeVersion": "1.0",
  "codex": {
    "categories": ["Custom"],
    "subcategories": {
      "Custom": ["Data Processing"]
    },
    "resources": {
      "primaryDocumentation": [
        {
          "url": "https://docs.example.com/api"
        }
      ]
    }
  }
}

开发环境搭建

项目初始化

bash
# 创建新的 n8n 节点项目
npm init n8n-node my-custom-node

# 进入项目目录
cd my-custom-node

# 安装依赖
npm install

# 开发模式
npm run dev

项目结构

my-custom-node/
├── package.json
├── tsconfig.json
├── nodes/
│   └── MyCustomNode/
│       ├── MyCustomNode.node.ts
│       └── MyCustomNode.node.json
├── credentials/
│   └── MyCustomApi.credentials.ts
└── dist/

节点开发实例

简单数据处理节点

typescript
// MyCustomNode.node.ts
import { IExecuteFunctions } from 'n8n-core';
import {
  INodeExecutionData,
  INodeType,
  INodeTypeDescription,
  NodePropertyTypes,
} from 'n8n-workflow';

export class MyCustomNode implements INodeType {
  description: INodeTypeDescription = {
    displayName: 'My Custom Node',
    name: 'myCustomNode',
    icon: 'file:MyCustomNodeIcon.svg',
    group: ['transform'],
    version: 1,
    subtitle: '={{$parameter["operation"]}}',
    description: 'Custom data processing node',
    defaults: {
      name: 'My Custom Node',
    },
    inputs: ['main'],
    outputs: ['main'],
    properties: [
      {
        displayName: 'Operation',
        name: 'operation',
        type: 'options',
        noDataExpression: true,
        options: [
          {
            name: 'Transform Data',
            value: 'transform',
            description: 'Transform input data',
            action: 'Transform data',
          },
          {
            name: 'Validate Data',
            value: 'validate',
            description: 'Validate input data',
            action: 'Validate data',
          },
          {
            name: 'Aggregate Data',
            value: 'aggregate',
            description: 'Aggregate input data',
            action: 'Aggregate data',
          },
        ],
        default: 'transform',
      },
      {
        displayName: 'Transform Rules',
        name: 'transformRules',
        type: 'fixedCollection',
        typeOptions: {
          multipleValues: true,
        },
        displayOptions: {
          show: {
            operation: ['transform'],
          },
        },
        default: {},
        options: [
          {
            name: 'rules',
            displayName: 'Rule',
            values: [
              {
                displayName: 'Field Name',
                name: 'fieldName',
                type: 'string',
                default: '',
                description: 'Name of the field to transform',
              },
              {
                displayName: 'Transform Type',
                name: 'transformType',
                type: 'options',
                options: [
                  {
                    name: 'Uppercase',
                    value: 'uppercase',
                  },
                  {
                    name: 'Lowercase',
                    value: 'lowercase',
                  },
                  {
                    name: 'Capitalize',
                    value: 'capitalize',
                  },
                  {
                    name: 'Trim',
                    value: 'trim',
                  },
                ],
                default: 'uppercase',
              },
            ],
          },
        ],
      },
      {
        displayName: 'Validation Schema',
        name: 'validationSchema',
        type: 'json',
        displayOptions: {
          show: {
            operation: ['validate'],
          },
        },
        default: '{\n  "required": ["name", "email"],\n  "properties": {\n    "name": {"type": "string"},\n    "email": {"type": "string", "format": "email"}\n  }\n}',
        description: 'JSON Schema for validation',
      },
      {
        displayName: 'Aggregation Field',
        name: 'aggregationField',
        type: 'string',
        displayOptions: {
          show: {
            operation: ['aggregate'],
          },
        },
        default: '',
        description: 'Field to aggregate by',
      },
      {
        displayName: 'Aggregation Function',
        name: 'aggregationFunction',
        type: 'options',
        displayOptions: {
          show: {
            operation: ['aggregate'],
          },
        },
        options: [
          {
            name: 'Count',
            value: 'count',
          },
          {
            name: 'Sum',
            value: 'sum',
          },
          {
            name: 'Average',
            value: 'average',
          },
          {
            name: 'Min',
            value: 'min',
          },
          {
            name: 'Max',
            value: 'max',
          },
        ],
        default: 'count',
      },
    ],
  };

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const items = this.getInputData();
    const operation = this.getNodeParameter('operation', 0) as string;
    
    let returnData: INodeExecutionData[] = [];

    switch (operation) {
      case 'transform':
        returnData = await this.transformData(items);
        break;
      case 'validate':
        returnData = await this.validateData(items);
        break;
      case 'aggregate':
        returnData = await this.aggregateData(items);
        break;
      default:
        throw new Error(`Unknown operation: ${operation}`);
    }

    return [returnData];
  }

  private async transformData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
    const transformRules = this.getNodeParameter('transformRules.rules', 0, []) as Array<{
      fieldName: string;
      transformType: string;
    }>;

    return items.map((item, index) => {
      const newItem = { ...item };
      
      transformRules.forEach(rule => {
        const fieldValue = newItem.json[rule.fieldName];
        
        if (typeof fieldValue === 'string') {
          switch (rule.transformType) {
            case 'uppercase':
              newItem.json[rule.fieldName] = fieldValue.toUpperCase();
              break;
            case 'lowercase':
              newItem.json[rule.fieldName] = fieldValue.toLowerCase();
              break;
            case 'capitalize':
              newItem.json[rule.fieldName] = fieldValue.charAt(0).toUpperCase() + fieldValue.slice(1).toLowerCase();
              break;
            case 'trim':
              newItem.json[rule.fieldName] = fieldValue.trim();
              break;
          }
        }
      });

      return newItem;
    });
  }

  private async validateData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
    const validationSchema = JSON.parse(this.getNodeParameter('validationSchema', 0) as string);
    const Ajv = require('ajv');
    const addFormats = require('ajv-formats');
    
    const ajv = new Ajv();
    addFormats(ajv);
    const validate = ajv.compile(validationSchema);

    return items.map((item, index) => {
      const isValid = validate(item.json);
      
      return {
        json: {
          ...item.json,
          _validation: {
            isValid,
            errors: validate.errors || []
          }
        }
      };
    });
  }

  private async aggregateData(items: INodeExecutionData[]): Promise<INodeExecutionData[]> {
    const aggregationField = this.getNodeParameter('aggregationField', 0) as string;
    const aggregationFunction = this.getNodeParameter('aggregationFunction', 0) as string;

    const groups: { [key: string]: any[] } = {};
    
    // 分组数据
    items.forEach(item => {
      const groupKey = item.json[aggregationField] || 'null';
      if (!groups[groupKey]) {
        groups[groupKey] = [];
      }
      groups[groupKey].push(item.json);
    });

    // 聚合计算
    const results = Object.entries(groups).map(([groupKey, groupItems]) => {
      let aggregatedValue: any;
      
      switch (aggregationFunction) {
        case 'count':
          aggregatedValue = groupItems.length;
          break;
        case 'sum':
          aggregatedValue = groupItems.reduce((sum, item) => {
            const value = parseFloat(item.value) || 0;
            return sum + value;
          }, 0);
          break;
        case 'average':
          const sum = groupItems.reduce((sum, item) => {
            const value = parseFloat(item.value) || 0;
            return sum + value;
          }, 0);
          aggregatedValue = sum / groupItems.length;
          break;
        case 'min':
          aggregatedValue = Math.min(...groupItems.map(item => parseFloat(item.value) || 0));
          break;
        case 'max':
          aggregatedValue = Math.max(...groupItems.map(item => parseFloat(item.value) || 0));
          break;
        default:
          aggregatedValue = groupItems.length;
      }

      return {
        json: {
          [aggregationField]: groupKey,
          aggregatedValue,
          count: groupItems.length,
          function: aggregationFunction
        }
      };
    });

    return results;
  }
}

API 集成节点

typescript
// ApiIntegrationNode.node.ts
import { IExecuteFunctions } from 'n8n-core';
import {
  ICredentialsDecrypted,
  ICredentialTestFunctions,
  IDataObject,
  INodeCredentialTestResult,
  INodeExecutionData,
  INodeType,
  INodeTypeDescription,
  NodeOperationError,
} from 'n8n-workflow';

export class ApiIntegrationNode implements INodeType {
  description: INodeTypeDescription = {
    displayName: 'API Integration',
    name: 'apiIntegration',
    icon: 'file:ApiIntegrationIcon.svg',
    group: ['input'],
    version: 1,
    subtitle: '={{$parameter["operation"] + ": " + $parameter["resource"]}}',
    description: 'Interact with custom API',
    defaults: {
      name: 'API Integration',
    },
    inputs: ['main'],
    outputs: ['main'],
    credentials: [
      {
        name: 'customApi',
        required: true,
        testedBy: 'customApiTest',
      },
    ],
    properties: [
      {
        displayName: 'Resource',
        name: 'resource',
        type: 'options',
        noDataExpression: true,
        options: [
          {
            name: 'User',
            value: 'user',
          },
          {
            name: 'Order',
            value: 'order',
          },
          {
            name: 'Product',
            value: 'product',
          },
        ],
        default: 'user',
      },
      {
        displayName: 'Operation',
        name: 'operation',
        type: 'options',
        noDataExpression: true,
        displayOptions: {
          show: {
            resource: ['user'],
          },
        },
        options: [
          {
            name: 'Create',
            value: 'create',
            action: 'Create a user',
          },
          {
            name: 'Get',
            value: 'get',
            action: 'Get a user',
          },
          {
            name: 'Update',
            value: 'update',
            action: 'Update a user',
          },
          {
            name: 'Delete',
            value: 'delete',
            action: 'Delete a user',
          },
          {
            name: 'List',
            value: 'list',
            action: 'List users',
          },
        ],
        default: 'get',
      },
      // 更多属性配置...
    ],
  };

  methods = {
    credentialTest: {
      async customApiTest(
        this: ICredentialTestFunctions,
        credential: ICredentialsDecrypted,
      ): Promise<INodeCredentialTestResult> {
        const credentials = credential.data;
        const options = {
          method: 'GET',
          headers: {
            'Authorization': `Bearer ${credentials!.apiKey}`,
          },
          uri: `${credentials!.baseUrl}/api/test`,
          json: true,
        };

        try {
          await this.helpers.request(options);
        } catch (error) {
          return {
            status: 'Error',
            message: `Connection failed: ${error.message}`,
          };
        }

        return {
          status: 'OK',
          message: 'Authentication successful!',
        };
      },
    },
  };

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const items = this.getInputData();
    const returnData: INodeExecutionData[] = [];
    const resource = this.getNodeParameter('resource', 0) as string;
    const operation = this.getNodeParameter('operation', 0) as string;

    const credentials = await this.getCredentials('customApi');

    for (let i = 0; i < items.length; i++) {
      try {
        let responseData;

        if (resource === 'user') {
          responseData = await this.handleUserOperations(operation, i, credentials);
        } else if (resource === 'order') {
          responseData = await this.handleOrderOperations(operation, i, credentials);
        } else if (resource === 'product') {
          responseData = await this.handleProductOperations(operation, i, credentials);
        }

        if (Array.isArray(responseData)) {
          returnData.push(...responseData.map(item => ({ json: item })));
        } else {
          returnData.push({ json: responseData });
        }
      } catch (error) {
        if (this.continueOnFail()) {
          returnData.push({
            json: {
              error: error.message,
            },
          });
          continue;
        }
        throw error;
      }
    }

    return [returnData];
  }

  private async handleUserOperations(
    operation: string,
    itemIndex: number,
    credentials: IDataObject,
  ): Promise<any> {
    const baseUrl = credentials.baseUrl as string;
    const apiKey = credentials.apiKey as string;

    const baseOptions = {
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Content-Type': 'application/json',
      },
      json: true,
    };

    switch (operation) {
      case 'create':
        const createData = this.getNodeParameter('userData', itemIndex) as IDataObject;
        return await this.helpers.request({
          ...baseOptions,
          method: 'POST',
          uri: `${baseUrl}/api/users`,
          body: createData,
        });

      case 'get':
        const userId = this.getNodeParameter('userId', itemIndex) as string;
        return await this.helpers.request({
          ...baseOptions,
          method: 'GET',
          uri: `${baseUrl}/api/users/${userId}`,
        });

      case 'update':
        const updateUserId = this.getNodeParameter('userId', itemIndex) as string;
        const updateData = this.getNodeParameter('userData', itemIndex) as IDataObject;
        return await this.helpers.request({
          ...baseOptions,
          method: 'PUT',
          uri: `${baseUrl}/api/users/${updateUserId}`,
          body: updateData,
        });

      case 'delete':
        const deleteUserId = this.getNodeParameter('userId', itemIndex) as string;
        return await this.helpers.request({
          ...baseOptions,
          method: 'DELETE',
          uri: `${baseUrl}/api/users/${deleteUserId}`,
        });

      case 'list':
        const limit = this.getNodeParameter('limit', itemIndex, 50) as number;
        const offset = this.getNodeParameter('offset', itemIndex, 0) as number;
        return await this.helpers.request({
          ...baseOptions,
          method: 'GET',
          uri: `${baseUrl}/api/users`,
          qs: { limit, offset },
        });

      default:
        throw new NodeOperationError(this.getNode(), `Unknown operation: ${operation}`);
    }
  }

  private async handleOrderOperations(
    operation: string,
    itemIndex: number,
    credentials: IDataObject,
  ): Promise<any> {
    // 订单操作实现
    return {};
  }

  private async handleProductOperations(
    operation: string,
    itemIndex: number,
    credentials: IDataObject,
  ): Promise<any> {
    // 产品操作实现
    return {};
  }
}

凭证开发

自定义 API 凭证

typescript
// CustomApi.credentials.ts
import {
  IAuthenticateGeneric,
  ICredentialTestRequest,
  ICredentialType,
  INodeProperties,
} from 'n8n-workflow';

export class CustomApi implements ICredentialType {
  name = 'customApi';
  displayName = 'Custom API';
  documentationUrl = 'https://docs.example.com/api';
  properties: INodeProperties[] = [
    {
      displayName: 'Base URL',
      name: 'baseUrl',
      type: 'string',
      default: 'https://api.example.com',
      placeholder: 'https://api.example.com',
      description: 'The base URL of the API',
    },
    {
      displayName: 'API Key',
      name: 'apiKey',
      type: 'string',
      typeOptions: {
        password: true,
      },
      default: '',
      description: 'Your API key',
    },
    {
      displayName: 'Environment',
      name: 'environment',
      type: 'options',
      options: [
        {
          name: 'Production',
          value: 'production',
        },
        {
          name: 'Staging',
          value: 'staging',
        },
        {
          name: 'Development',
          value: 'development',
        },
      ],
      default: 'production',
    },
  ];

  authenticate: IAuthenticateGeneric = {
    type: 'generic',
    properties: {
      headers: {
        Authorization: '=Bearer {{$credentials.apiKey}}',
      },
    },
  };

  test: ICredentialTestRequest = {
    request: {
      baseURL: '={{$credentials.baseUrl}}',
      url: '/api/test',
      method: 'GET',
    },
  };
}

节点测试

单元测试

typescript
// MyCustomNode.test.ts
import { IExecuteFunctions } from 'n8n-core';
import { INodeExecutionData } from 'n8n-workflow';
import { MyCustomNode } from '../MyCustomNode.node';

describe('MyCustomNode', () => {
  let node: MyCustomNode;
  let mockExecuteFunctions: jest.Mocked<IExecuteFunctions>;

  beforeEach(() => {
    node = new MyCustomNode();
    mockExecuteFunctions = {
      getInputData: jest.fn(),
      getNodeParameter: jest.fn(),
      continueOnFail: jest.fn(),
    } as any;
  });

  describe('transform operation', () => {
    it('should transform data correctly', async () => {
      const inputData: INodeExecutionData[] = [
        {
          json: {
            name: 'john doe',
            email: 'JOHN@EXAMPLE.COM',
          },
        },
      ];

      mockExecuteFunctions.getInputData.mockReturnValue(inputData);
      mockExecuteFunctions.getNodeParameter
        .mockReturnValueOnce('transform')
        .mockReturnValueOnce([
          {
            fieldName: 'name',
            transformType: 'capitalize',
          },
          {
            fieldName: 'email',
            transformType: 'lowercase',
          },
        ]);

      const result = await node.execute.call(mockExecuteFunctions);

      expect(result[0]).toHaveLength(1);
      expect(result[0][0].json).toEqual({
        name: 'John doe',
        email: 'john@example.com',
      });
    });
  });

  describe('validate operation', () => {
    it('should validate data correctly', async () => {
      const inputData: INodeExecutionData[] = [
        {
          json: {
            name: 'John Doe',
            email: 'john@example.com',
          },
        },
      ];

      const validationSchema = {
        required: ['name', 'email'],
        properties: {
          name: { type: 'string' },
          email: { type: 'string', format: 'email' },
        },
      };

      mockExecuteFunctions.getInputData.mockReturnValue(inputData);
      mockExecuteFunctions.getNodeParameter
        .mockReturnValueOnce('validate')
        .mockReturnValueOnce(JSON.stringify(validationSchema));

      const result = await node.execute.call(mockExecuteFunctions);

      expect(result[0]).toHaveLength(1);
      expect(result[0][0].json._validation.isValid).toBe(true);
    });
  });
});

发布和分发

包配置

json
{
  "name": "n8n-nodes-mycustom",
  "version": "1.0.0",
  "description": "Custom nodes for n8n",
  "keywords": ["n8n-community-node-package"],
  "license": "MIT",
  "homepage": "https://github.com/username/n8n-nodes-mycustom",
  "author": {
    "name": "Your Name",
    "email": "your.email@example.com"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/username/n8n-nodes-mycustom.git"
  },
  "main": "index.js",
  "scripts": {
    "build": "tsc && gulp build:icons",
    "dev": "tsc --watch",
    "format": "prettier nodes credentials --write",
    "lint": "eslint nodes credentials package.json",
    "lintfix": "eslint nodes credentials package.json --fix",
    "prepublishOnly": "npm run build && npm run lint -s"
  },
  "files": ["dist"],
  "n8n": {
    "n8nNodesApiVersion": 1,
    "credentials": ["dist/credentials/CustomApi.credentials.js"],
    "nodes": ["dist/nodes/MyCustomNode/MyCustomNode.node.js"]
  },
  "devDependencies": {
    "@types/jest": "^29.5.5",
    "@types/node": "^18.16.16",
    "eslint": "^8.42.0",
    "gulp": "^4.0.2",
    "jest": "^29.6.2",
    "n8n-workflow": "*",
    "prettier": "^2.8.8",
    "typescript": "^5.1.3"
  },
  "peerDependencies": {
    "n8n-workflow": "*"
  }
}

发布到 npm

bash
# 构建项目
npm run build

# 运行测试
npm test

# 发布到 npm
npm publish

小结

自定义节点开发让我们能够:

  1. 扩展功能:为特定业务需求创建专用节点
  2. 提高复用性:将常用逻辑封装成可重用的节点
  3. 改善用户体验:提供更直观的操作界面
  4. 集成专有系统:连接内部或第三方系统
  5. 社区贡献:分享节点给 n8n 社区

下一篇文章,我们将学习性能优化与监控,这对于生产环境的工作流至关重要。

记住,开发自定义节点时要注重代码质量、错误处理和用户体验。完善的文档和测试是成功节点的关键要素。