Skip to content

n8n 错误处理与重试机制 - 构建稳定的自动化系统

在生产环境中,错误是不可避免的。网络中断、API 限制、数据格式错误等问题都可能导致工作流失败。今天我们来学习如何在 n8n 中构建完善的错误处理和重试机制。

错误类型分析

常见错误分类

网络相关错误

  • 连接超时
  • DNS 解析失败
  • 网络中断
  • 服务器不可达

API 相关错误

  • 认证失败 (401)
  • 权限不足 (403)
  • 资源不存在 (404)
  • 频率限制 (429)
  • 服务器错误 (500)

数据相关错误

  • 数据格式错误
  • 必填字段缺失
  • 数据类型不匹配
  • 数据验证失败

业务逻辑错误

  • 业务规则违反
  • 状态不一致
  • 重复操作
  • 资源冲突

节点级错误处理

Continue On Fail 设置

json
{
  "continueOnFail": true,
  "retryOnFail": true,
  "retryTimes": 3,
  "retryInterval": 1000
}

实际应用案例

HTTP 请求错误处理

javascript
// 在 Code 节点中处理 HTTP 错误
const results = [];

for (const item of items) {
  try {
    const response = await fetch('https://api.example.com/data', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${item.json.token}`
      },
      body: JSON.stringify(item.json.data)
    });
    
    if (!response.ok) {
      // 根据状态码处理不同错误
      let errorType = 'unknown';
      let shouldRetry = false;
      
      switch (response.status) {
        case 400:
          errorType = 'bad_request';
          shouldRetry = false;
          break;
        case 401:
          errorType = 'unauthorized';
          shouldRetry = false;
          break;
        case 429:
          errorType = 'rate_limit';
          shouldRetry = true;
          break;
        case 500:
        case 502:
        case 503:
          errorType = 'server_error';
          shouldRetry = true;
          break;
      }
      
      results.push({
        json: {
          ...item.json,
          success: false,
          error: {
            type: errorType,
            status: response.status,
            message: await response.text(),
            shouldRetry
          }
        }
      });
    } else {
      const data = await response.json();
      results.push({
        json: {
          ...item.json,
          success: true,
          response: data
        }
      });
    }
  } catch (error) {
    results.push({
      json: {
        ...item.json,
        success: false,
        error: {
          type: 'network_error',
          message: error.message,
          shouldRetry: true
        }
      });
    }
  }
}

return results;

Error Trigger 使用

Error Trigger 可以捕获工作流中的错误并进行处理。

基本配置

json
{
  "errorWorkflows": ["error-handler-workflow-id"]
}

错误处理工作流

javascript
// 在错误处理工作流中分析错误
const errorData = items[0].json;
const originalWorkflow = errorData.workflow;
const errorNode = errorData.node;
const errorMessage = errorData.error.message;

// 分析错误类型
let errorCategory = 'unknown';
let severity = 'medium';
let shouldNotify = true;

if (errorMessage.includes('timeout')) {
  errorCategory = 'timeout';
  severity = 'low';
  shouldNotify = false; // 超时错误不需要立即通知
} else if (errorMessage.includes('401') || errorMessage.includes('unauthorized')) {
  errorCategory = 'auth';
  severity = 'high';
} else if (errorMessage.includes('429') || errorMessage.includes('rate limit')) {
  errorCategory = 'rate_limit';
  severity = 'low';
  shouldNotify = false;
} else if (errorMessage.includes('500') || errorMessage.includes('502')) {
  errorCategory = 'server_error';
  severity = 'medium';
}

return [{
  json: {
    ...errorData,
    analysis: {
      category: errorCategory,
      severity,
      shouldNotify,
      timestamp: new Date().toISOString()
    }
  }
}];

智能重试机制

指数退避重试

javascript
// 实现指数退避重试
async function retryWithBackoff(operation, maxRetries = 5, baseDelay = 1000) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
    } catch (error) {
      if (attempt === maxRetries) {
        throw error;
      }
      
      // 检查是否应该重试
      if (!shouldRetry(error)) {
        throw error;
      }
      
      // 计算延迟时间(指数退避 + 随机抖动)
      const delay = baseDelay * Math.pow(2, attempt - 1);
      const jitter = Math.random() * 0.1 * delay;
      const totalDelay = delay + jitter;
      
      console.log(`Attempt ${attempt} failed, retrying in ${totalDelay}ms`);
      await new Promise(resolve => setTimeout(resolve, totalDelay));
    }
  }
}

function shouldRetry(error) {
  // 定义哪些错误应该重试
  const retryableErrors = [
    'timeout',
    'ECONNRESET',
    'ENOTFOUND',
    'ECONNREFUSED',
    '429', // Rate limit
    '500', // Internal server error
    '502', // Bad gateway
    '503', // Service unavailable
    '504'  // Gateway timeout
  ];
  
  return retryableErrors.some(retryableError => 
    error.message.includes(retryableError)
  );
}

// 使用重试机制
const results = [];
for (const item of items) {
  try {
    const result = await retryWithBackoff(async () => {
      return await processItem(item.json);
    });
    
    results.push({
      json: {
        ...item.json,
        result,
        success: true
      }
    });
  } catch (error) {
    results.push({
      json: {
        ...item.json,
        success: false,
        error: error.message,
        finalFailure: true
      }
    });
  }
}

return results;

条件重试

javascript
// 基于条件的重试逻辑
async function conditionalRetry(operation, item) {
  const maxRetries = getMaxRetries(item);
  const retryDelay = getRetryDelay(item);
  
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await operation(item);
    } catch (error) {
      // 根据错误类型和数据重要性决定是否重试
      if (!shouldRetryForItem(error, item, attempt)) {
        throw error;
      }
      
      if (attempt < maxRetries) {
        await new Promise(resolve => setTimeout(resolve, retryDelay));
      }
    }
  }
}

function getMaxRetries(item) {
  // 根据数据重要性设置重试次数
  if (item.priority === 'critical') return 5;
  if (item.priority === 'high') return 3;
  return 2;
}

function getRetryDelay(item) {
  // 根据数据类型设置重试延迟
  if (item.type === 'real_time') return 500;
  if (item.type === 'batch') return 2000;
  return 1000;
}

function shouldRetryForItem(error, item, attempt) {
  // 某些关键数据即使多次失败也要继续重试
  if (item.priority === 'critical' && attempt <= 10) {
    return true;
  }
  
  // 对于非关键数据,某些错误不重试
  if (error.message.includes('400') || error.message.includes('401')) {
    return false;
  }
  
  return true;
}

错误恢复策略

数据备份与恢复

javascript
// 实现数据备份和恢复机制
const BACKUP_KEY = 'workflow_backup';

// 备份原始数据
function backupData(data) {
  const backup = {
    timestamp: new Date().toISOString(),
    data: JSON.parse(JSON.stringify(data)),
    workflowId: $workflow.id,
    executionId: $execution.id
  };
  
  $workflow.context[BACKUP_KEY] = backup;
  return backup;
}

// 恢复数据
function restoreData() {
  const backup = $workflow.context[BACKUP_KEY];
  if (!backup) {
    throw new Error('No backup data found');
  }
  
  return backup.data;
}

// 处理数据时创建备份
const originalData = items.map(item => item.json);
const backup = backupData(originalData);

try {
  // 尝试处理数据
  const processedData = await processItems(originalData);
  
  // 清除备份
  delete $workflow.context[BACKUP_KEY];
  
  return processedData.map(data => ({ json: data }));
} catch (error) {
  console.log('Processing failed, attempting recovery');
  
  // 恢复原始数据
  const recoveredData = restoreData();
  
  return [{
    json: {
      error: 'Processing failed, data recovered',
      originalError: error.message,
      recoveredItems: recoveredData.length,
      backup: backup
    }
  }];
}

部分失败处理

javascript
// 处理部分成功的情况
async function processWithPartialFailure(items) {
  const results = {
    successful: [],
    failed: [],
    summary: {
      total: items.length,
      successCount: 0,
      failureCount: 0
    }
  };
  
  for (const item of items) {
    try {
      const result = await processItem(item.json);
      results.successful.push({
        json: {
          ...item.json,
          result,
          status: 'success'
        }
      });
      results.summary.successCount++;
    } catch (error) {
      results.failed.push({
        json: {
          ...item.json,
          error: error.message,
          status: 'failed'
        }
      });
      results.summary.failureCount++;
    }
  }
  
  // 如果失败率过高,抛出错误
  const failureRate = results.summary.failureCount / results.summary.total;
  if (failureRate > 0.5) {
    throw new Error(`High failure rate: ${Math.round(failureRate * 100)}%`);
  }
  
  return results;
}

const processingResults = await processWithPartialFailure(items);

// 返回成功的项目,失败的项目可以通过其他分支处理
return processingResults.successful;

监控和告警

错误统计

javascript
// 收集错误统计信息
const ERROR_STATS_KEY = 'error_statistics';
const stats = $workflow.context[ERROR_STATS_KEY] || {
  totalErrors: 0,
  errorsByType: {},
  errorsByNode: {},
  lastReset: new Date().toISOString()
};

// 更新错误统计
function updateErrorStats(error, nodeName) {
  stats.totalErrors++;
  
  // 按错误类型统计
  const errorType = categorizeError(error);
  stats.errorsByType[errorType] = (stats.errorsByType[errorType] || 0) + 1;
  
  // 按节点统计
  stats.errorsByNode[nodeName] = (stats.errorsByNode[nodeName] || 0) + 1;
  
  $workflow.context[ERROR_STATS_KEY] = stats;
}

function categorizeError(error) {
  if (error.message.includes('timeout')) return 'timeout';
  if (error.message.includes('401')) return 'auth';
  if (error.message.includes('429')) return 'rate_limit';
  if (error.message.includes('500')) return 'server_error';
  return 'other';
}

// 检查是否需要告警
function shouldAlert(stats) {
  const now = new Date();
  const lastReset = new Date(stats.lastReset);
  const hoursSinceReset = (now - lastReset) / (1000 * 60 * 60);
  
  // 每小时错误超过10个
  if (stats.totalErrors > 10 && hoursSinceReset < 1) {
    return true;
  }
  
  // 某个节点错误率过高
  for (const [node, count] of Object.entries(stats.errorsByNode)) {
    if (count > 5) {
      return true;
    }
  }
  
  return false;
}

// 处理当前错误
const currentError = items[0].json;
updateErrorStats(currentError, currentError.node.name);

if (shouldAlert(stats)) {
  return [{
    json: {
      alert: true,
      message: 'High error rate detected',
      statistics: stats,
      currentError
    }
  }];
}

return [{ json: { processed: true, stats } }];

健康检查

javascript
// 实现工作流健康检查
function performHealthCheck() {
  const health = {
    status: 'healthy',
    checks: {},
    timestamp: new Date().toISOString()
  };
  
  // 检查 API 连接
  health.checks.apiConnection = checkApiConnection();
  
  // 检查数据库连接
  health.checks.databaseConnection = checkDatabaseConnection();
  
  // 检查错误率
  health.checks.errorRate = checkErrorRate();
  
  // 检查内存使用
  health.checks.memoryUsage = checkMemoryUsage();
  
  // 综合评估
  const failedChecks = Object.values(health.checks).filter(check => !check.healthy);
  if (failedChecks.length > 0) {
    health.status = 'unhealthy';
  }
  
  return health;
}

async function checkApiConnection() {
  try {
    const response = await fetch('https://api.example.com/health', {
      timeout: 5000
    });
    return {
      healthy: response.ok,
      responseTime: response.headers.get('x-response-time'),
      status: response.status
    };
  } catch (error) {
    return {
      healthy: false,
      error: error.message
    };
  }
}

function checkErrorRate() {
  const stats = $workflow.context.error_statistics || {};
  const errorRate = stats.totalErrors || 0;
  
  return {
    healthy: errorRate < 10,
    errorCount: errorRate,
    threshold: 10
  };
}

function checkMemoryUsage() {
  const used = process.memoryUsage();
  const usedMB = Math.round(used.heapUsed / 1024 / 1024);
  
  return {
    healthy: usedMB < 500,
    usedMemory: `${usedMB}MB`,
    threshold: '500MB'
  };
}

const healthStatus = performHealthCheck();
return [{ json: healthStatus }];

错误通知系统

分级通知

javascript
// 实现分级错误通知
function createErrorNotification(error, severity) {
  const notification = {
    id: `error_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
    timestamp: new Date().toISOString(),
    severity,
    error,
    workflow: {
      id: $workflow.id,
      name: $workflow.name
    },
    execution: {
      id: $execution.id,
      mode: $execution.mode
    }
  };
  
  // 根据严重程度选择通知方式
  switch (severity) {
    case 'critical':
      notification.channels = ['email', 'sms', 'slack'];
      notification.escalation = true;
      break;
    case 'high':
      notification.channels = ['email', 'slack'];
      notification.escalation = false;
      break;
    case 'medium':
      notification.channels = ['slack'];
      notification.escalation = false;
      break;
    case 'low':
      notification.channels = ['log'];
      notification.escalation = false;
      break;
  }
  
  return notification;
}

// 错误严重程度评估
function assessErrorSeverity(error) {
  // 认证错误 - 高严重程度
  if (error.message.includes('401') || error.message.includes('403')) {
    return 'high';
  }
  
  // 数据丢失风险 - 严重
  if (error.message.includes('data loss') || error.message.includes('corruption')) {
    return 'critical';
  }
  
  // 服务不可用 - 高
  if (error.message.includes('503') || error.message.includes('timeout')) {
    return 'high';
  }
  
  // 频率限制 - 低
  if (error.message.includes('429')) {
    return 'low';
  }
  
  return 'medium';
}

const error = items[0].json;
const severity = assessErrorSeverity(error);
const notification = createErrorNotification(error, severity);

return [{ json: notification }];

小结

完善的错误处理机制是生产级工作流的基础:

  1. 分类处理错误:不同类型的错误采用不同的处理策略
  2. 智能重试机制:使用指数退避和条件重试
  3. 数据保护:实现备份和恢复机制
  4. 监控告警:及时发现和响应问题
  5. 分级通知:根据严重程度选择合适的通知方式

下一篇文章,我们将学习变量和表达式系统,这是实现动态工作流的重要技术。

记住,好的错误处理不仅要能恢复错误,还要能从错误中学习,不断优化系统的稳定性和可靠性。