n8n 错误处理与重试机制 - 构建稳定的自动化系统
在生产环境中,错误是不可避免的。网络中断、API 限制、数据格式错误等问题都可能导致工作流失败。今天我们来学习如何在 n8n 中构建完善的错误处理和重试机制。
错误类型分析
常见错误分类
网络相关错误
- 连接超时
- DNS 解析失败
- 网络中断
- 服务器不可达
API 相关错误
- 认证失败 (401)
- 权限不足 (403)
- 资源不存在 (404)
- 频率限制 (429)
- 服务器错误 (500)
数据相关错误
- 数据格式错误
- 必填字段缺失
- 数据类型不匹配
- 数据验证失败
业务逻辑错误
- 业务规则违反
- 状态不一致
- 重复操作
- 资源冲突
节点级错误处理
Continue On Fail 设置
json
{
"continueOnFail": true,
"retryOnFail": true,
"retryTimes": 3,
"retryInterval": 1000
}
实际应用案例
HTTP 请求错误处理
javascript
// 在 Code 节点中处理 HTTP 错误
const results = [];
for (const item of items) {
try {
const response = await fetch('https://api.example.com/data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${item.json.token}`
},
body: JSON.stringify(item.json.data)
});
if (!response.ok) {
// 根据状态码处理不同错误
let errorType = 'unknown';
let shouldRetry = false;
switch (response.status) {
case 400:
errorType = 'bad_request';
shouldRetry = false;
break;
case 401:
errorType = 'unauthorized';
shouldRetry = false;
break;
case 429:
errorType = 'rate_limit';
shouldRetry = true;
break;
case 500:
case 502:
case 503:
errorType = 'server_error';
shouldRetry = true;
break;
}
results.push({
json: {
...item.json,
success: false,
error: {
type: errorType,
status: response.status,
message: await response.text(),
shouldRetry
}
}
});
} else {
const data = await response.json();
results.push({
json: {
...item.json,
success: true,
response: data
}
});
}
} catch (error) {
results.push({
json: {
...item.json,
success: false,
error: {
type: 'network_error',
message: error.message,
shouldRetry: true
}
});
}
}
}
return results;
Error Trigger 使用
Error Trigger 可以捕获工作流中的错误并进行处理。
基本配置
json
{
"errorWorkflows": ["error-handler-workflow-id"]
}
错误处理工作流
javascript
// 在错误处理工作流中分析错误
const errorData = items[0].json;
const originalWorkflow = errorData.workflow;
const errorNode = errorData.node;
const errorMessage = errorData.error.message;
// 分析错误类型
let errorCategory = 'unknown';
let severity = 'medium';
let shouldNotify = true;
if (errorMessage.includes('timeout')) {
errorCategory = 'timeout';
severity = 'low';
shouldNotify = false; // 超时错误不需要立即通知
} else if (errorMessage.includes('401') || errorMessage.includes('unauthorized')) {
errorCategory = 'auth';
severity = 'high';
} else if (errorMessage.includes('429') || errorMessage.includes('rate limit')) {
errorCategory = 'rate_limit';
severity = 'low';
shouldNotify = false;
} else if (errorMessage.includes('500') || errorMessage.includes('502')) {
errorCategory = 'server_error';
severity = 'medium';
}
return [{
json: {
...errorData,
analysis: {
category: errorCategory,
severity,
shouldNotify,
timestamp: new Date().toISOString()
}
}
}];
智能重试机制
指数退避重试
javascript
// 实现指数退避重试
async function retryWithBackoff(operation, maxRetries = 5, baseDelay = 1000) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) {
throw error;
}
// 检查是否应该重试
if (!shouldRetry(error)) {
throw error;
}
// 计算延迟时间(指数退避 + 随机抖动)
const delay = baseDelay * Math.pow(2, attempt - 1);
const jitter = Math.random() * 0.1 * delay;
const totalDelay = delay + jitter;
console.log(`Attempt ${attempt} failed, retrying in ${totalDelay}ms`);
await new Promise(resolve => setTimeout(resolve, totalDelay));
}
}
}
function shouldRetry(error) {
// 定义哪些错误应该重试
const retryableErrors = [
'timeout',
'ECONNRESET',
'ENOTFOUND',
'ECONNREFUSED',
'429', // Rate limit
'500', // Internal server error
'502', // Bad gateway
'503', // Service unavailable
'504' // Gateway timeout
];
return retryableErrors.some(retryableError =>
error.message.includes(retryableError)
);
}
// 使用重试机制
const results = [];
for (const item of items) {
try {
const result = await retryWithBackoff(async () => {
return await processItem(item.json);
});
results.push({
json: {
...item.json,
result,
success: true
}
});
} catch (error) {
results.push({
json: {
...item.json,
success: false,
error: error.message,
finalFailure: true
}
});
}
}
return results;
条件重试
javascript
// 基于条件的重试逻辑
async function conditionalRetry(operation, item) {
const maxRetries = getMaxRetries(item);
const retryDelay = getRetryDelay(item);
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation(item);
} catch (error) {
// 根据错误类型和数据重要性决定是否重试
if (!shouldRetryForItem(error, item, attempt)) {
throw error;
}
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
}
function getMaxRetries(item) {
// 根据数据重要性设置重试次数
if (item.priority === 'critical') return 5;
if (item.priority === 'high') return 3;
return 2;
}
function getRetryDelay(item) {
// 根据数据类型设置重试延迟
if (item.type === 'real_time') return 500;
if (item.type === 'batch') return 2000;
return 1000;
}
function shouldRetryForItem(error, item, attempt) {
// 某些关键数据即使多次失败也要继续重试
if (item.priority === 'critical' && attempt <= 10) {
return true;
}
// 对于非关键数据,某些错误不重试
if (error.message.includes('400') || error.message.includes('401')) {
return false;
}
return true;
}
错误恢复策略
数据备份与恢复
javascript
// 实现数据备份和恢复机制
const BACKUP_KEY = 'workflow_backup';
// 备份原始数据
function backupData(data) {
const backup = {
timestamp: new Date().toISOString(),
data: JSON.parse(JSON.stringify(data)),
workflowId: $workflow.id,
executionId: $execution.id
};
$workflow.context[BACKUP_KEY] = backup;
return backup;
}
// 恢复数据
function restoreData() {
const backup = $workflow.context[BACKUP_KEY];
if (!backup) {
throw new Error('No backup data found');
}
return backup.data;
}
// 处理数据时创建备份
const originalData = items.map(item => item.json);
const backup = backupData(originalData);
try {
// 尝试处理数据
const processedData = await processItems(originalData);
// 清除备份
delete $workflow.context[BACKUP_KEY];
return processedData.map(data => ({ json: data }));
} catch (error) {
console.log('Processing failed, attempting recovery');
// 恢复原始数据
const recoveredData = restoreData();
return [{
json: {
error: 'Processing failed, data recovered',
originalError: error.message,
recoveredItems: recoveredData.length,
backup: backup
}
}];
}
部分失败处理
javascript
// 处理部分成功的情况
async function processWithPartialFailure(items) {
const results = {
successful: [],
failed: [],
summary: {
total: items.length,
successCount: 0,
failureCount: 0
}
};
for (const item of items) {
try {
const result = await processItem(item.json);
results.successful.push({
json: {
...item.json,
result,
status: 'success'
}
});
results.summary.successCount++;
} catch (error) {
results.failed.push({
json: {
...item.json,
error: error.message,
status: 'failed'
}
});
results.summary.failureCount++;
}
}
// 如果失败率过高,抛出错误
const failureRate = results.summary.failureCount / results.summary.total;
if (failureRate > 0.5) {
throw new Error(`High failure rate: ${Math.round(failureRate * 100)}%`);
}
return results;
}
const processingResults = await processWithPartialFailure(items);
// 返回成功的项目,失败的项目可以通过其他分支处理
return processingResults.successful;
监控和告警
错误统计
javascript
// 收集错误统计信息
const ERROR_STATS_KEY = 'error_statistics';
const stats = $workflow.context[ERROR_STATS_KEY] || {
totalErrors: 0,
errorsByType: {},
errorsByNode: {},
lastReset: new Date().toISOString()
};
// 更新错误统计
function updateErrorStats(error, nodeName) {
stats.totalErrors++;
// 按错误类型统计
const errorType = categorizeError(error);
stats.errorsByType[errorType] = (stats.errorsByType[errorType] || 0) + 1;
// 按节点统计
stats.errorsByNode[nodeName] = (stats.errorsByNode[nodeName] || 0) + 1;
$workflow.context[ERROR_STATS_KEY] = stats;
}
function categorizeError(error) {
if (error.message.includes('timeout')) return 'timeout';
if (error.message.includes('401')) return 'auth';
if (error.message.includes('429')) return 'rate_limit';
if (error.message.includes('500')) return 'server_error';
return 'other';
}
// 检查是否需要告警
function shouldAlert(stats) {
const now = new Date();
const lastReset = new Date(stats.lastReset);
const hoursSinceReset = (now - lastReset) / (1000 * 60 * 60);
// 每小时错误超过10个
if (stats.totalErrors > 10 && hoursSinceReset < 1) {
return true;
}
// 某个节点错误率过高
for (const [node, count] of Object.entries(stats.errorsByNode)) {
if (count > 5) {
return true;
}
}
return false;
}
// 处理当前错误
const currentError = items[0].json;
updateErrorStats(currentError, currentError.node.name);
if (shouldAlert(stats)) {
return [{
json: {
alert: true,
message: 'High error rate detected',
statistics: stats,
currentError
}
}];
}
return [{ json: { processed: true, stats } }];
健康检查
javascript
// 实现工作流健康检查
function performHealthCheck() {
const health = {
status: 'healthy',
checks: {},
timestamp: new Date().toISOString()
};
// 检查 API 连接
health.checks.apiConnection = checkApiConnection();
// 检查数据库连接
health.checks.databaseConnection = checkDatabaseConnection();
// 检查错误率
health.checks.errorRate = checkErrorRate();
// 检查内存使用
health.checks.memoryUsage = checkMemoryUsage();
// 综合评估
const failedChecks = Object.values(health.checks).filter(check => !check.healthy);
if (failedChecks.length > 0) {
health.status = 'unhealthy';
}
return health;
}
async function checkApiConnection() {
try {
const response = await fetch('https://api.example.com/health', {
timeout: 5000
});
return {
healthy: response.ok,
responseTime: response.headers.get('x-response-time'),
status: response.status
};
} catch (error) {
return {
healthy: false,
error: error.message
};
}
}
function checkErrorRate() {
const stats = $workflow.context.error_statistics || {};
const errorRate = stats.totalErrors || 0;
return {
healthy: errorRate < 10,
errorCount: errorRate,
threshold: 10
};
}
function checkMemoryUsage() {
const used = process.memoryUsage();
const usedMB = Math.round(used.heapUsed / 1024 / 1024);
return {
healthy: usedMB < 500,
usedMemory: `${usedMB}MB`,
threshold: '500MB'
};
}
const healthStatus = performHealthCheck();
return [{ json: healthStatus }];
错误通知系统
分级通知
javascript
// 实现分级错误通知
function createErrorNotification(error, severity) {
const notification = {
id: `error_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
timestamp: new Date().toISOString(),
severity,
error,
workflow: {
id: $workflow.id,
name: $workflow.name
},
execution: {
id: $execution.id,
mode: $execution.mode
}
};
// 根据严重程度选择通知方式
switch (severity) {
case 'critical':
notification.channels = ['email', 'sms', 'slack'];
notification.escalation = true;
break;
case 'high':
notification.channels = ['email', 'slack'];
notification.escalation = false;
break;
case 'medium':
notification.channels = ['slack'];
notification.escalation = false;
break;
case 'low':
notification.channels = ['log'];
notification.escalation = false;
break;
}
return notification;
}
// 错误严重程度评估
function assessErrorSeverity(error) {
// 认证错误 - 高严重程度
if (error.message.includes('401') || error.message.includes('403')) {
return 'high';
}
// 数据丢失风险 - 严重
if (error.message.includes('data loss') || error.message.includes('corruption')) {
return 'critical';
}
// 服务不可用 - 高
if (error.message.includes('503') || error.message.includes('timeout')) {
return 'high';
}
// 频率限制 - 低
if (error.message.includes('429')) {
return 'low';
}
return 'medium';
}
const error = items[0].json;
const severity = assessErrorSeverity(error);
const notification = createErrorNotification(error, severity);
return [{ json: notification }];
小结
完善的错误处理机制是生产级工作流的基础:
- 分类处理错误:不同类型的错误采用不同的处理策略
- 智能重试机制:使用指数退避和条件重试
- 数据保护:实现备份和恢复机制
- 监控告警:及时发现和响应问题
- 分级通知:根据严重程度选择合适的通知方式
下一篇文章,我们将学习变量和表达式系统,这是实现动态工作流的重要技术。
记住,好的错误处理不仅要能恢复错误,还要能从错误中学习,不断优化系统的稳定性和可靠性。