n8n Webhook 与 API 集成 - 连接外部世界
Webhook 和 API 集成是 n8n 连接外部系统的核心功能。通过这些技术,我们可以实现实时的数据交换和事件驱动的自动化。今天我们来深入学习这些重要技术。
Webhook 基础
什么是 Webhook
Webhook 是一种"反向 API",它允许应用程序在特定事件发生时主动向指定的 URL 发送 HTTP 请求。
传统 API vs Webhook
- API:客户端主动请求数据
- Webhook:服务端主动推送数据
Webhook Trigger 配置
基本配置
json
{
"httpMethod": "POST",
"path": "my-webhook",
"authentication": "none",
"responseMode": "onReceived"
}
完整 URL 格式
https://your-n8n-instance.com/webhook/my-webhook
Webhook 安全配置
认证方式
Basic Authentication
json
{
"authentication": "basicAuth",
"basicAuth": {
"user": "webhook_user",
"password": "secure_password_123"
}
}
Header Authentication
json
{
"authentication": "headerAuth",
"headerAuth": {
"name": "X-API-Key",
"value": "your-secret-api-key"
}
}
签名验证
javascript
// 验证 GitHub Webhook 签名
function verifyGitHubSignature(payload, signature, secret) {
const crypto = require('crypto');
const expectedSignature = 'sha256=' + crypto
.createHmac('sha256', secret)
.update(payload, 'utf8')
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
// 在 Webhook 处理中验证签名
const payload = JSON.stringify(items[0].json.body);
const signature = items[0].json.headers['x-hub-signature-256'];
const secret = process.env.GITHUB_WEBHOOK_SECRET;
if (!verifyGitHubSignature(payload, signature, secret)) {
throw new Error('Invalid signature');
}
return [{
json: {
verified: true,
event: items[0].json.headers['x-github-event'],
payload: items[0].json.body
}
}];
IP 白名单
javascript
// 验证请求来源 IP
function verifySourceIP(clientIP, allowedIPs) {
const ipRanges = allowedIPs.map(ip => {
if (ip.includes('/')) {
// CIDR 格式
return parseCIDR(ip);
} else {
// 单个 IP
return { start: ip, end: ip };
}
});
return ipRanges.some(range => isIPInRange(clientIP, range));
}
// GitHub Webhook IP 范围
const githubIPs = [
'192.30.252.0/22',
'185.199.108.0/22',
'140.82.112.0/20'
];
const clientIP = items[0].json.headers['x-forwarded-for'] ||
items[0].json.headers['x-real-ip'];
if (!verifySourceIP(clientIP, githubIPs)) {
throw new Error('Request from unauthorized IP');
}
HTTP Request 节点详解
基本配置
json
{
"method": "POST",
"url": "https://api.example.com/users",
"authentication": "bearerToken",
"bearerToken": "your-access-token",
"headers": {
"Content-Type": "application/json",
"User-Agent": "n8n-workflow/1.0"
},
"body": {
"name": "{{ $json.name }}",
"email": "{{ $json.email }}"
}
}
认证方式
Bearer Token
json
{
"authentication": "bearerToken",
"bearerToken": "{{ $env.API_TOKEN }}"
}
OAuth2
json
{
"authentication": "oAuth2Api",
"oAuth2Api": {
"clientId": "{{ $env.OAUTH_CLIENT_ID }}",
"clientSecret": "{{ $env.OAUTH_CLIENT_SECRET }}",
"accessTokenUrl": "https://api.example.com/oauth/token",
"scope": "read write"
}
}
API Key
json
{
"authentication": "headerAuth",
"headerAuth": {
"name": "X-API-Key",
"value": "{{ $env.API_KEY }}"
}
}
高级 HTTP 配置
超时和重试
json
{
"timeout": 30000,
"retry": {
"enabled": true,
"maxRetries": 3,
"retryInterval": 1000
}
}
代理设置
json
{
"proxy": {
"protocol": "http",
"host": "proxy.company.com",
"port": 8080,
"auth": {
"username": "proxy_user",
"password": "proxy_pass"
}
}
}
API 集成最佳实践
错误处理
javascript
// 完善的 API 错误处理
async function callAPIWithErrorHandling(url, options) {
const maxRetries = 3;
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, options);
// 处理不同的 HTTP 状态码
if (response.ok) {
return await response.json();
}
const errorBody = await response.text();
const error = new Error(`HTTP ${response.status}: ${errorBody}`);
error.status = response.status;
error.response = errorBody;
// 根据状态码决定是否重试
if (shouldRetry(response.status) && attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000; // 指数退避
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
throw error;
} catch (error) {
lastError = error;
// 网络错误通常可以重试
if (isNetworkError(error) && attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
throw lastError;
}
function shouldRetry(status) {
return [408, 429, 500, 502, 503, 504].includes(status);
}
function isNetworkError(error) {
return error.code === 'ECONNRESET' ||
error.code === 'ENOTFOUND' ||
error.code === 'ECONNREFUSED';
}
// 使用示例
try {
const result = await callAPIWithErrorHandling('https://api.example.com/data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.API_TOKEN}`
},
body: JSON.stringify(items[0].json)
});
return [{ json: { success: true, data: result } }];
} catch (error) {
return [{
json: {
success: false,
error: error.message,
status: error.status,
retryable: shouldRetry(error.status)
}
}];
}
频率限制处理
javascript
// 处理 API 频率限制
class RateLimiter {
constructor(maxRequests, timeWindow) {
this.maxRequests = maxRequests;
this.timeWindow = timeWindow;
this.requests = [];
}
async waitForSlot() {
const now = Date.now();
// 清理过期的请求记录
this.requests = this.requests.filter(
time => now - time < this.timeWindow
);
// 如果达到限制,等待
if (this.requests.length >= this.maxRequests) {
const oldestRequest = Math.min(...this.requests);
const waitTime = this.timeWindow - (now - oldestRequest);
if (waitTime > 0) {
await new Promise(resolve => setTimeout(resolve, waitTime));
return this.waitForSlot(); // 递归检查
}
}
// 记录当前请求
this.requests.push(now);
}
}
// 创建限流器(每分钟最多100个请求)
const rateLimiter = new RateLimiter(100, 60000);
// 在发送请求前等待
await rateLimiter.waitForSlot();
const response = await fetch('https://api.example.com/data', {
method: 'GET',
headers: {
'Authorization': `Bearer ${process.env.API_TOKEN}`
}
});
分页数据处理
javascript
// 处理分页 API 数据
async function fetchAllPages(baseUrl, params = {}) {
const allData = [];
let page = 1;
let hasMore = true;
while (hasMore) {
try {
const url = new URL(baseUrl);
url.searchParams.set('page', page);
url.searchParams.set('limit', params.limit || 100);
// 添加其他参数
Object.entries(params).forEach(([key, value]) => {
if (key !== 'limit') {
url.searchParams.set(key, value);
}
});
const response = await fetch(url.toString(), {
headers: {
'Authorization': `Bearer ${process.env.API_TOKEN}`,
'Content-Type': 'application/json'
}
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// 根据 API 响应格式调整
const items = data.items || data.data || data.results || [];
allData.push(...items);
// 检查是否还有更多数据
hasMore = data.hasNext ||
data.has_more ||
(data.pagination && data.pagination.has_next) ||
items.length === (params.limit || 100);
page++;
// 防止无限循环
if (page > 1000) {
console.warn('Reached maximum page limit (1000)');
break;
}
// API 友好的延迟
await new Promise(resolve => setTimeout(resolve, 100));
} catch (error) {
console.error(`Error fetching page ${page}:`, error);
break;
}
}
return allData;
}
// 使用示例
const allUsers = await fetchAllPages('https://api.example.com/users', {
limit: 50,
status: 'active'
});
return allUsers.map(user => ({ json: user }));
实际集成案例
GitHub Webhook 集成
javascript
// 处理 GitHub Webhook 事件
const event = items[0].json.headers['x-github-event'];
const payload = items[0].json.body;
let action = '';
let message = '';
switch (event) {
case 'push':
action = 'code_push';
message = `${payload.pusher.name} pushed ${payload.commits.length} commits to ${payload.repository.name}`;
break;
case 'pull_request':
action = `pr_${payload.action}`;
message = `Pull request #${payload.number} ${payload.action} by ${payload.pull_request.user.login}`;
break;
case 'issues':
action = `issue_${payload.action}`;
message = `Issue #${payload.issue.number} ${payload.action} by ${payload.issue.user.login}`;
break;
case 'release':
action = 'release_published';
message = `Release ${payload.release.tag_name} published for ${payload.repository.name}`;
break;
default:
action = 'unknown';
message = `Unknown GitHub event: ${event}`;
}
return [{
json: {
event,
action,
message,
repository: payload.repository?.name,
sender: payload.sender?.login,
timestamp: new Date().toISOString(),
payload: payload
}
}];
Slack 集成
javascript
// 发送 Slack 消息
async function sendSlackMessage(webhook, message) {
const payload = {
text: message.text,
channel: message.channel,
username: message.username || 'n8n Bot',
icon_emoji: message.icon || ':robot_face:',
attachments: message.attachments || []
};
const response = await fetch(webhook, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(payload)
});
if (!response.ok) {
throw new Error(`Slack API error: ${response.status}`);
}
return await response.text();
}
// 构建富文本消息
const slackMessage = {
text: 'Deployment Notification',
channel: '#deployments',
attachments: [{
color: 'good',
title: 'Deployment Successful',
fields: [
{
title: 'Environment',
value: 'Production',
short: true
},
{
title: 'Version',
value: 'v1.2.3',
short: true
},
{
title: 'Duration',
value: '2m 30s',
short: true
}
],
footer: 'n8n Automation',
ts: Math.floor(Date.now() / 1000)
}]
};
await sendSlackMessage(process.env.SLACK_WEBHOOK_URL, slackMessage);
支付回调处理
javascript
// 处理支付平台回调
function processPaymentCallback(payload, headers) {
// 验证签名
const signature = headers['x-signature'];
const expectedSignature = generateSignature(payload, process.env.PAYMENT_SECRET);
if (signature !== expectedSignature) {
throw new Error('Invalid payment signature');
}
// 解析支付状态
const paymentStatus = {
orderId: payload.order_id,
transactionId: payload.transaction_id,
amount: parseFloat(payload.amount),
currency: payload.currency,
status: payload.status,
paidAt: new Date(payload.paid_at),
method: payload.payment_method
};
// 根据状态执行不同操作
switch (paymentStatus.status) {
case 'paid':
return {
action: 'fulfill_order',
orderId: paymentStatus.orderId,
amount: paymentStatus.amount,
message: 'Payment successful, fulfilling order'
};
case 'failed':
return {
action: 'cancel_order',
orderId: paymentStatus.orderId,
reason: payload.failure_reason,
message: 'Payment failed, canceling order'
};
case 'refunded':
return {
action: 'process_refund',
orderId: paymentStatus.orderId,
amount: paymentStatus.amount,
message: 'Payment refunded, processing return'
};
default:
return {
action: 'log_event',
orderId: paymentStatus.orderId,
status: paymentStatus.status,
message: 'Unknown payment status'
};
}
}
function generateSignature(payload, secret) {
const crypto = require('crypto');
return crypto
.createHmac('sha256', secret)
.update(JSON.stringify(payload))
.digest('hex');
}
const result = processPaymentCallback(items[0].json.body, items[0].json.headers);
return [{ json: result }];
API 性能优化
请求缓存
javascript
// 实现 API 响应缓存
class APICache {
constructor(ttl = 300000) { // 默认5分钟
this.cache = new Map();
this.ttl = ttl;
}
generateKey(url, options) {
return `${options.method || 'GET'}:${url}:${JSON.stringify(options.body || {})}`;
}
get(key) {
const item = this.cache.get(key);
if (!item) return null;
if (Date.now() > item.expiry) {
this.cache.delete(key);
return null;
}
return item.data;
}
set(key, data) {
this.cache.set(key, {
data,
expiry: Date.now() + this.ttl
});
}
clear() {
this.cache.clear();
}
}
// 使用缓存的 API 调用
const apiCache = new APICache(300000); // 5分钟缓存
async function cachedAPICall(url, options = {}) {
const cacheKey = apiCache.generateKey(url, options);
// 尝试从缓存获取
const cached = apiCache.get(cacheKey);
if (cached) {
console.log('Cache hit for:', url);
return cached;
}
// 发送实际请求
console.log('Cache miss, fetching:', url);
const response = await fetch(url, options);
const data = await response.json();
// 缓存结果
apiCache.set(cacheKey, data);
return data;
}
并发请求控制
javascript
// 控制并发请求数量
class ConcurrencyLimiter {
constructor(maxConcurrency = 5) {
this.maxConcurrency = maxConcurrency;
this.running = 0;
this.queue = [];
}
async execute(fn) {
return new Promise((resolve, reject) => {
this.queue.push({ fn, resolve, reject });
this.process();
});
}
async process() {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}
this.running++;
const { fn, resolve, reject } = this.queue.shift();
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.process(); // 处理队列中的下一个任务
}
}
}
// 使用并发限制器
const limiter = new ConcurrencyLimiter(3); // 最多3个并发请求
const promises = items.map(item =>
limiter.execute(async () => {
const response = await fetch(`https://api.example.com/process/${item.json.id}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.API_TOKEN}`
},
body: JSON.stringify(item.json)
});
return await response.json();
})
);
const results = await Promise.all(promises);
return results.map(result => ({ json: result }));
小结
Webhook 和 API 集成是 n8n 连接外部世界的重要技术:
- 安全第一:实现签名验证、IP 白名单等安全措施
- 错误处理:完善的重试机制和错误分类处理
- 性能优化:缓存、并发控制、频率限制
- 监控告警:跟踪 API 调用状态和性能
- 标准化:统一的错误处理和响应格式
下一篇文章,我们将学习数据库操作和 SQL 节点的使用,这是数据持久化的重要技术。
记住,好的 API 集成不仅要能正常工作,还要能处理各种异常情况,包括网络问题、服务不可用、数据格式变化等。充分的测试和监控是成功集成的关键。