Skip to content

n8n Webhook 与 API 集成 - 连接外部世界

Webhook 和 API 集成是 n8n 连接外部系统的核心功能。通过这些技术,我们可以实现实时的数据交换和事件驱动的自动化。今天我们来深入学习这些重要技术。

Webhook 基础

什么是 Webhook

Webhook 是一种"反向 API",它允许应用程序在特定事件发生时主动向指定的 URL 发送 HTTP 请求。

传统 API vs Webhook

  • API:客户端主动请求数据
  • Webhook:服务端主动推送数据

Webhook Trigger 配置

基本配置

json
{
  "httpMethod": "POST",
  "path": "my-webhook",
  "authentication": "none",
  "responseMode": "onReceived"
}

完整 URL 格式

https://your-n8n-instance.com/webhook/my-webhook

Webhook 安全配置

认证方式

Basic Authentication

json
{
  "authentication": "basicAuth",
  "basicAuth": {
    "user": "webhook_user",
    "password": "secure_password_123"
  }
}

Header Authentication

json
{
  "authentication": "headerAuth",
  "headerAuth": {
    "name": "X-API-Key",
    "value": "your-secret-api-key"
  }
}

签名验证

javascript
// 验证 GitHub Webhook 签名
function verifyGitHubSignature(payload, signature, secret) {
  const crypto = require('crypto');
  const expectedSignature = 'sha256=' + crypto
    .createHmac('sha256', secret)
    .update(payload, 'utf8')
    .digest('hex');
  
  return crypto.timingSafeEqual(
    Buffer.from(signature),
    Buffer.from(expectedSignature)
  );
}

// 在 Webhook 处理中验证签名
const payload = JSON.stringify(items[0].json.body);
const signature = items[0].json.headers['x-hub-signature-256'];
const secret = process.env.GITHUB_WEBHOOK_SECRET;

if (!verifyGitHubSignature(payload, signature, secret)) {
  throw new Error('Invalid signature');
}

return [{
  json: {
    verified: true,
    event: items[0].json.headers['x-github-event'],
    payload: items[0].json.body
  }
}];

IP 白名单

javascript
// 验证请求来源 IP
function verifySourceIP(clientIP, allowedIPs) {
  const ipRanges = allowedIPs.map(ip => {
    if (ip.includes('/')) {
      // CIDR 格式
      return parseCIDR(ip);
    } else {
      // 单个 IP
      return { start: ip, end: ip };
    }
  });
  
  return ipRanges.some(range => isIPInRange(clientIP, range));
}

// GitHub Webhook IP 范围
const githubIPs = [
  '192.30.252.0/22',
  '185.199.108.0/22',
  '140.82.112.0/20'
];

const clientIP = items[0].json.headers['x-forwarded-for'] || 
                 items[0].json.headers['x-real-ip'];

if (!verifySourceIP(clientIP, githubIPs)) {
  throw new Error('Request from unauthorized IP');
}

HTTP Request 节点详解

基本配置

json
{
  "method": "POST",
  "url": "https://api.example.com/users",
  "authentication": "bearerToken",
  "bearerToken": "your-access-token",
  "headers": {
    "Content-Type": "application/json",
    "User-Agent": "n8n-workflow/1.0"
  },
  "body": {
    "name": "{{ $json.name }}",
    "email": "{{ $json.email }}"
  }
}

认证方式

Bearer Token

json
{
  "authentication": "bearerToken",
  "bearerToken": "{{ $env.API_TOKEN }}"
}

OAuth2

json
{
  "authentication": "oAuth2Api",
  "oAuth2Api": {
    "clientId": "{{ $env.OAUTH_CLIENT_ID }}",
    "clientSecret": "{{ $env.OAUTH_CLIENT_SECRET }}",
    "accessTokenUrl": "https://api.example.com/oauth/token",
    "scope": "read write"
  }
}

API Key

json
{
  "authentication": "headerAuth",
  "headerAuth": {
    "name": "X-API-Key",
    "value": "{{ $env.API_KEY }}"
  }
}

高级 HTTP 配置

超时和重试

json
{
  "timeout": 30000,
  "retry": {
    "enabled": true,
    "maxRetries": 3,
    "retryInterval": 1000
  }
}

代理设置

json
{
  "proxy": {
    "protocol": "http",
    "host": "proxy.company.com",
    "port": 8080,
    "auth": {
      "username": "proxy_user",
      "password": "proxy_pass"
    }
  }
}

API 集成最佳实践

错误处理

javascript
// 完善的 API 错误处理
async function callAPIWithErrorHandling(url, options) {
  const maxRetries = 3;
  let lastError;
  
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, options);
      
      // 处理不同的 HTTP 状态码
      if (response.ok) {
        return await response.json();
      }
      
      const errorBody = await response.text();
      const error = new Error(`HTTP ${response.status}: ${errorBody}`);
      error.status = response.status;
      error.response = errorBody;
      
      // 根据状态码决定是否重试
      if (shouldRetry(response.status) && attempt < maxRetries) {
        const delay = Math.pow(2, attempt) * 1000; // 指数退避
        await new Promise(resolve => setTimeout(resolve, delay));
        continue;
      }
      
      throw error;
    } catch (error) {
      lastError = error;
      
      // 网络错误通常可以重试
      if (isNetworkError(error) && attempt < maxRetries) {
        const delay = Math.pow(2, attempt) * 1000;
        await new Promise(resolve => setTimeout(resolve, delay));
        continue;
      }
      
      throw error;
    }
  }
  
  throw lastError;
}

function shouldRetry(status) {
  return [408, 429, 500, 502, 503, 504].includes(status);
}

function isNetworkError(error) {
  return error.code === 'ECONNRESET' || 
         error.code === 'ENOTFOUND' || 
         error.code === 'ECONNREFUSED';
}

// 使用示例
try {
  const result = await callAPIWithErrorHandling('https://api.example.com/data', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${process.env.API_TOKEN}`
    },
    body: JSON.stringify(items[0].json)
  });
  
  return [{ json: { success: true, data: result } }];
} catch (error) {
  return [{
    json: {
      success: false,
      error: error.message,
      status: error.status,
      retryable: shouldRetry(error.status)
    }
  }];
}

频率限制处理

javascript
// 处理 API 频率限制
class RateLimiter {
  constructor(maxRequests, timeWindow) {
    this.maxRequests = maxRequests;
    this.timeWindow = timeWindow;
    this.requests = [];
  }
  
  async waitForSlot() {
    const now = Date.now();
    
    // 清理过期的请求记录
    this.requests = this.requests.filter(
      time => now - time < this.timeWindow
    );
    
    // 如果达到限制,等待
    if (this.requests.length >= this.maxRequests) {
      const oldestRequest = Math.min(...this.requests);
      const waitTime = this.timeWindow - (now - oldestRequest);
      
      if (waitTime > 0) {
        await new Promise(resolve => setTimeout(resolve, waitTime));
        return this.waitForSlot(); // 递归检查
      }
    }
    
    // 记录当前请求
    this.requests.push(now);
  }
}

// 创建限流器(每分钟最多100个请求)
const rateLimiter = new RateLimiter(100, 60000);

// 在发送请求前等待
await rateLimiter.waitForSlot();

const response = await fetch('https://api.example.com/data', {
  method: 'GET',
  headers: {
    'Authorization': `Bearer ${process.env.API_TOKEN}`
  }
});

分页数据处理

javascript
// 处理分页 API 数据
async function fetchAllPages(baseUrl, params = {}) {
  const allData = [];
  let page = 1;
  let hasMore = true;
  
  while (hasMore) {
    try {
      const url = new URL(baseUrl);
      url.searchParams.set('page', page);
      url.searchParams.set('limit', params.limit || 100);
      
      // 添加其他参数
      Object.entries(params).forEach(([key, value]) => {
        if (key !== 'limit') {
          url.searchParams.set(key, value);
        }
      });
      
      const response = await fetch(url.toString(), {
        headers: {
          'Authorization': `Bearer ${process.env.API_TOKEN}`,
          'Content-Type': 'application/json'
        }
      });
      
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }
      
      const data = await response.json();
      
      // 根据 API 响应格式调整
      const items = data.items || data.data || data.results || [];
      allData.push(...items);
      
      // 检查是否还有更多数据
      hasMore = data.hasNext || 
                data.has_more || 
                (data.pagination && data.pagination.has_next) ||
                items.length === (params.limit || 100);
      
      page++;
      
      // 防止无限循环
      if (page > 1000) {
        console.warn('Reached maximum page limit (1000)');
        break;
      }
      
      // API 友好的延迟
      await new Promise(resolve => setTimeout(resolve, 100));
      
    } catch (error) {
      console.error(`Error fetching page ${page}:`, error);
      break;
    }
  }
  
  return allData;
}

// 使用示例
const allUsers = await fetchAllPages('https://api.example.com/users', {
  limit: 50,
  status: 'active'
});

return allUsers.map(user => ({ json: user }));

实际集成案例

GitHub Webhook 集成

javascript
// 处理 GitHub Webhook 事件
const event = items[0].json.headers['x-github-event'];
const payload = items[0].json.body;

let action = '';
let message = '';

switch (event) {
  case 'push':
    action = 'code_push';
    message = `${payload.pusher.name} pushed ${payload.commits.length} commits to ${payload.repository.name}`;
    break;
    
  case 'pull_request':
    action = `pr_${payload.action}`;
    message = `Pull request #${payload.number} ${payload.action} by ${payload.pull_request.user.login}`;
    break;
    
  case 'issues':
    action = `issue_${payload.action}`;
    message = `Issue #${payload.issue.number} ${payload.action} by ${payload.issue.user.login}`;
    break;
    
  case 'release':
    action = 'release_published';
    message = `Release ${payload.release.tag_name} published for ${payload.repository.name}`;
    break;
    
  default:
    action = 'unknown';
    message = `Unknown GitHub event: ${event}`;
}

return [{
  json: {
    event,
    action,
    message,
    repository: payload.repository?.name,
    sender: payload.sender?.login,
    timestamp: new Date().toISOString(),
    payload: payload
  }
}];

Slack 集成

javascript
// 发送 Slack 消息
async function sendSlackMessage(webhook, message) {
  const payload = {
    text: message.text,
    channel: message.channel,
    username: message.username || 'n8n Bot',
    icon_emoji: message.icon || ':robot_face:',
    attachments: message.attachments || []
  };
  
  const response = await fetch(webhook, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify(payload)
  });
  
  if (!response.ok) {
    throw new Error(`Slack API error: ${response.status}`);
  }
  
  return await response.text();
}

// 构建富文本消息
const slackMessage = {
  text: 'Deployment Notification',
  channel: '#deployments',
  attachments: [{
    color: 'good',
    title: 'Deployment Successful',
    fields: [
      {
        title: 'Environment',
        value: 'Production',
        short: true
      },
      {
        title: 'Version',
        value: 'v1.2.3',
        short: true
      },
      {
        title: 'Duration',
        value: '2m 30s',
        short: true
      }
    ],
    footer: 'n8n Automation',
    ts: Math.floor(Date.now() / 1000)
  }]
};

await sendSlackMessage(process.env.SLACK_WEBHOOK_URL, slackMessage);

支付回调处理

javascript
// 处理支付平台回调
function processPaymentCallback(payload, headers) {
  // 验证签名
  const signature = headers['x-signature'];
  const expectedSignature = generateSignature(payload, process.env.PAYMENT_SECRET);
  
  if (signature !== expectedSignature) {
    throw new Error('Invalid payment signature');
  }
  
  // 解析支付状态
  const paymentStatus = {
    orderId: payload.order_id,
    transactionId: payload.transaction_id,
    amount: parseFloat(payload.amount),
    currency: payload.currency,
    status: payload.status,
    paidAt: new Date(payload.paid_at),
    method: payload.payment_method
  };
  
  // 根据状态执行不同操作
  switch (paymentStatus.status) {
    case 'paid':
      return {
        action: 'fulfill_order',
        orderId: paymentStatus.orderId,
        amount: paymentStatus.amount,
        message: 'Payment successful, fulfilling order'
      };
      
    case 'failed':
      return {
        action: 'cancel_order',
        orderId: paymentStatus.orderId,
        reason: payload.failure_reason,
        message: 'Payment failed, canceling order'
      };
      
    case 'refunded':
      return {
        action: 'process_refund',
        orderId: paymentStatus.orderId,
        amount: paymentStatus.amount,
        message: 'Payment refunded, processing return'
      };
      
    default:
      return {
        action: 'log_event',
        orderId: paymentStatus.orderId,
        status: paymentStatus.status,
        message: 'Unknown payment status'
      };
  }
}

function generateSignature(payload, secret) {
  const crypto = require('crypto');
  return crypto
    .createHmac('sha256', secret)
    .update(JSON.stringify(payload))
    .digest('hex');
}

const result = processPaymentCallback(items[0].json.body, items[0].json.headers);
return [{ json: result }];

API 性能优化

请求缓存

javascript
// 实现 API 响应缓存
class APICache {
  constructor(ttl = 300000) { // 默认5分钟
    this.cache = new Map();
    this.ttl = ttl;
  }
  
  generateKey(url, options) {
    return `${options.method || 'GET'}:${url}:${JSON.stringify(options.body || {})}`;
  }
  
  get(key) {
    const item = this.cache.get(key);
    if (!item) return null;
    
    if (Date.now() > item.expiry) {
      this.cache.delete(key);
      return null;
    }
    
    return item.data;
  }
  
  set(key, data) {
    this.cache.set(key, {
      data,
      expiry: Date.now() + this.ttl
    });
  }
  
  clear() {
    this.cache.clear();
  }
}

// 使用缓存的 API 调用
const apiCache = new APICache(300000); // 5分钟缓存

async function cachedAPICall(url, options = {}) {
  const cacheKey = apiCache.generateKey(url, options);
  
  // 尝试从缓存获取
  const cached = apiCache.get(cacheKey);
  if (cached) {
    console.log('Cache hit for:', url);
    return cached;
  }
  
  // 发送实际请求
  console.log('Cache miss, fetching:', url);
  const response = await fetch(url, options);
  const data = await response.json();
  
  // 缓存结果
  apiCache.set(cacheKey, data);
  
  return data;
}

并发请求控制

javascript
// 控制并发请求数量
class ConcurrencyLimiter {
  constructor(maxConcurrency = 5) {
    this.maxConcurrency = maxConcurrency;
    this.running = 0;
    this.queue = [];
  }
  
  async execute(fn) {
    return new Promise((resolve, reject) => {
      this.queue.push({ fn, resolve, reject });
      this.process();
    });
  }
  
  async process() {
    if (this.running >= this.maxConcurrency || this.queue.length === 0) {
      return;
    }
    
    this.running++;
    const { fn, resolve, reject } = this.queue.shift();
    
    try {
      const result = await fn();
      resolve(result);
    } catch (error) {
      reject(error);
    } finally {
      this.running--;
      this.process(); // 处理队列中的下一个任务
    }
  }
}

// 使用并发限制器
const limiter = new ConcurrencyLimiter(3); // 最多3个并发请求

const promises = items.map(item => 
  limiter.execute(async () => {
    const response = await fetch(`https://api.example.com/process/${item.json.id}`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${process.env.API_TOKEN}`
      },
      body: JSON.stringify(item.json)
    });
    
    return await response.json();
  })
);

const results = await Promise.all(promises);
return results.map(result => ({ json: result }));

小结

Webhook 和 API 集成是 n8n 连接外部世界的重要技术:

  1. 安全第一:实现签名验证、IP 白名单等安全措施
  2. 错误处理:完善的重试机制和错误分类处理
  3. 性能优化:缓存、并发控制、频率限制
  4. 监控告警:跟踪 API 调用状态和性能
  5. 标准化:统一的错误处理和响应格式

下一篇文章,我们将学习数据库操作和 SQL 节点的使用,这是数据持久化的重要技术。

记住,好的 API 集成不仅要能正常工作,还要能处理各种异常情况,包括网络问题、服务不可用、数据格式变化等。充分的测试和监控是成功集成的关键。