Skip to content

n8n 循环与批处理 - 高效处理大量数据

在实际应用中,我们经常需要处理大量数据,比如批量导入用户信息、处理大量订单、或者调用有频率限制的 API。今天我们来学习如何在 n8n 中实现高效的循环和批处理。

批处理基础概念

为什么需要批处理

API 限制

  • 大多数 API 都有频率限制
  • 单次请求的数据量限制
  • 避免超时和连接问题

性能考虑

  • 减少内存占用
  • 提高处理效率
  • 避免系统过载

错误恢复

  • 部分失败不影响整体
  • 便于重试和错误定位
  • 提高系统稳定性

Split In Batches 节点详解

Split In Batches 是 n8n 中最重要的批处理节点。

基本配置

json
{
  "batchSize": 10,
  "options": {
    "reset": false
  }
}

工作原理

输入数据 → Split In Batches → 处理逻辑 → 循环回 Split In Batches
   ↓                                              ↑
100条记录                                    处理完成后循环

第1批(10条) → 处理 → 第2批(10条) → 处理 → ... → 第10批(10条) → 处理 → 结束

实际应用案例

案例1:批量发送邮件

javascript
// 准备邮件数据
const emailList = [
  { email: 'user1@example.com', name: '张三' },
  { email: 'user2@example.com', name: '李四' },
  // ... 更多用户
];

// Split In Batches 配置
{
  "batchSize": 5,  // 每批处理5个邮件
  "options": {
    "reset": false
  }
}

案例2:API 数据同步

javascript
// 处理大量订单数据
const orders = items; // 假设有1000个订单

// 在 Code 节点中处理每批数据
const batch = items;
const processedOrders = [];

for (const item of batch) {
  const order = item.json;
  
  try {
    // 调用外部 API 更新订单状态
    const response = await fetch(`https://api.example.com/orders/${order.id}`, {
      method: 'PUT',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${process.env.API_TOKEN}`
      },
      body: JSON.stringify({
        status: 'processed',
        updatedAt: new Date().toISOString()
      })
    });
    
    if (response.ok) {
      processedOrders.push({
        json: {
          ...order,
          status: 'updated',
          apiResponse: await response.json()
        }
      });
    } else {
      processedOrders.push({
        json: {
          ...order,
          status: 'failed',
          error: `API error: ${response.status}`
        }
      });
    }
  } catch (error) {
    processedOrders.push({
      json: {
        ...order,
        status: 'error',
        error: error.message
      }
    });
  }
}

return processedOrders;

高级批处理模式

动态批次大小

javascript
// 根据数据类型动态调整批次大小
const items = $input.all();
let batchSize = 10; // 默认批次大小

// 根据数据复杂度调整
if (items[0]?.json?.type === 'complex') {
  batchSize = 5;  // 复杂数据减少批次大小
} else if (items[0]?.json?.type === 'simple') {
  batchSize = 20; // 简单数据增加批次大小
}

// 根据系统负载调整
const currentHour = new Date().getHours();
if (currentHour >= 9 && currentHour <= 17) {
  batchSize = Math.floor(batchSize * 0.7); // 工作时间减少负载
}

return [{
  json: {
    recommendedBatchSize: batchSize,
    totalItems: items.length,
    estimatedBatches: Math.ceil(items.length / batchSize)
  }
}];

带延迟的批处理

javascript
// 在批处理间添加延迟
const DELAY_MS = 1000; // 1秒延迟

// 处理当前批次
const currentBatch = items;
const results = [];

for (const item of currentBatch) {
  // 处理单个项目
  const result = await processItem(item.json);
  results.push({ json: result });
  
  // 在项目间添加小延迟
  await new Promise(resolve => setTimeout(resolve, 100));
}

// 批次间延迟
if ($node.context.batchIndex > 0) {
  await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}

return results;

错误处理与重试

javascript
// 带重试机制的批处理
async function processWithRetry(item, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const result = await processItem(item);
      return {
        json: {
          ...item,
          result,
          success: true,
          attempts: attempt
        }
      };
    } catch (error) {
      if (attempt === maxRetries) {
        return {
          json: {
            ...item,
            error: error.message,
            success: false,
            attempts: attempt
          }
        };
      }
      
      // 指数退避延迟
      const delay = Math.pow(2, attempt) * 1000;
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}

const results = [];
for (const item of items) {
  const result = await processWithRetry(item.json);
  results.push(result);
}

return results;

循环控制技巧

条件循环

javascript
// 基于条件的循环控制
const data = items[0].json;
const shouldContinue = data.hasMore && data.page < data.maxPages;

return [{
  json: {
    ...data,
    shouldContinue,
    nextPage: shouldContinue ? data.page + 1 : null
  }
}];

循环计数器

javascript
// 使用工作流变量作为计数器
const currentCount = $workflow.context.loopCount || 0;
const maxIterations = 100;

if (currentCount >= maxIterations) {
  throw new Error('Maximum iterations reached');
}

// 更新计数器
$workflow.context.loopCount = currentCount + 1;

return [{
  json: {
    iteration: currentCount + 1,
    remaining: maxIterations - currentCount - 1,
    data: items[0].json
  }
}];

分页数据处理

javascript
// 处理分页 API 数据
async function fetchAllPages(baseUrl, params = {}) {
  const allData = [];
  let page = 1;
  let hasMore = true;
  
  while (hasMore) {
    try {
      const response = await fetch(`${baseUrl}?page=${page}&limit=50`, {
        headers: {
          'Authorization': `Bearer ${params.token}`
        }
      });
      
      const data = await response.json();
      allData.push(...data.items);
      
      hasMore = data.hasNext;
      page++;
      
      // 避免无限循环
      if (page > 1000) {
        throw new Error('Too many pages, possible infinite loop');
      }
      
      // API 限制延迟
      await new Promise(resolve => setTimeout(resolve, 200));
      
    } catch (error) {
      console.error(`Error fetching page ${page}:`, error);
      break;
    }
  }
  
  return allData;
}

const allItems = await fetchAllPages('https://api.example.com/items', {
  token: process.env.API_TOKEN
});

return allItems.map(item => ({ json: item }));

性能优化策略

内存管理

javascript
// 流式处理大数据
function* processLargeDataset(items) {
  const CHUNK_SIZE = 100;
  
  for (let i = 0; i < items.length; i += CHUNK_SIZE) {
    const chunk = items.slice(i, i + CHUNK_SIZE);
    
    // 处理当前块
    const processed = chunk.map(item => ({
      json: {
        ...item.json,
        processed: true,
        chunkIndex: Math.floor(i / CHUNK_SIZE)
      }
    }));
    
    yield processed;
    
    // 强制垃圾回收(如果可用)
    if (global.gc) {
      global.gc();
    }
  }
}

// 使用生成器处理数据
const generator = processLargeDataset(items);
const currentChunk = generator.next().value;

return currentChunk || [];

并行处理

javascript
// 并行处理多个批次
async function processBatchesInParallel(batches, concurrency = 3) {
  const results = [];
  
  for (let i = 0; i < batches.length; i += concurrency) {
    const currentBatches = batches.slice(i, i + concurrency);
    
    const promises = currentBatches.map(async (batch, index) => {
      try {
        return await processBatch(batch);
      } catch (error) {
        return {
          error: error.message,
          batchIndex: i + index
        };
      }
    });
    
    const batchResults = await Promise.all(promises);
    results.push(...batchResults);
  }
  
  return results;
}

// 将数据分成批次
const BATCH_SIZE = 10;
const batches = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) {
  batches.push(items.slice(i, i + BATCH_SIZE));
}

const results = await processBatchesInParallel(batches);
return results.map(result => ({ json: result }));

进度跟踪

javascript
// 添加进度跟踪
const totalItems = $workflow.context.totalItems || items.length;
const processedItems = $workflow.context.processedItems || 0;
const currentBatchSize = items.length;

const newProcessedCount = processedItems + currentBatchSize;
const progress = Math.round((newProcessedCount / totalItems) * 100);

// 更新进度
$workflow.context.processedItems = newProcessedCount;

// 发送进度通知(每10%发送一次)
if (progress % 10 === 0 && progress > ($workflow.context.lastNotifiedProgress || 0)) {
  $workflow.context.lastNotifiedProgress = progress;
  
  // 这里可以发送通知
  console.log(`Processing progress: ${progress}% (${newProcessedCount}/${totalItems})`);
}

return items.map(item => ({
  json: {
    ...item.json,
    progress: {
      current: newProcessedCount,
      total: totalItems,
      percentage: progress
    }
  }
}));

错误恢复机制

断点续传

javascript
// 实现断点续传功能
const CHECKPOINT_KEY = 'batch_processing_checkpoint';
const checkpoint = $workflow.context[CHECKPOINT_KEY] || { lastProcessedIndex: 0 };

const startIndex = checkpoint.lastProcessedIndex;
const remainingItems = items.slice(startIndex);

if (remainingItems.length === 0) {
  return [{ json: { message: 'All items processed', totalProcessed: startIndex } }];
}

// 处理当前批次
const BATCH_SIZE = 10;
const currentBatch = remainingItems.slice(0, BATCH_SIZE);
const results = [];

for (let i = 0; i < currentBatch.length; i++) {
  try {
    const result = await processItem(currentBatch[i].json);
    results.push({ json: result });
    
    // 更新检查点
    checkpoint.lastProcessedIndex = startIndex + i + 1;
    $workflow.context[CHECKPOINT_KEY] = checkpoint;
    
  } catch (error) {
    // 记录错误但继续处理
    results.push({
      json: {
        ...currentBatch[i].json,
        error: error.message,
        failed: true
      }
    });
  }
}

return results;

失败重试队列

javascript
// 管理失败项目的重试队列
const RETRY_QUEUE_KEY = 'retry_queue';
const MAX_RETRIES = 3;

let retryQueue = $workflow.context[RETRY_QUEUE_KEY] || [];
const results = [];

for (const item of items) {
  const data = item.json;
  const retryCount = data.retryCount || 0;
  
  try {
    const result = await processItem(data);
    results.push({ json: result });
    
  } catch (error) {
    if (retryCount < MAX_RETRIES) {
      // 添加到重试队列
      retryQueue.push({
        ...data,
        retryCount: retryCount + 1,
        lastError: error.message,
        retryAt: Date.now() + (Math.pow(2, retryCount) * 60000) // 指数退避
      });
    } else {
      // 超过最大重试次数,标记为永久失败
      results.push({
        json: {
          ...data,
          permanentFailure: true,
          finalError: error.message
        }
      });
    }
  }
}

// 更新重试队列
$workflow.context[RETRY_QUEUE_KEY] = retryQueue;

return results;

监控和日志

批处理监控

javascript
// 添加详细的监控信息
const startTime = Date.now();
const batchInfo = {
  batchId: `batch_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
  startTime: new Date().toISOString(),
  itemCount: items.length
};

console.log(`Starting batch processing: ${batchInfo.batchId}`);

const results = [];
let successCount = 0;
let errorCount = 0;

for (const item of items) {
  try {
    const result = await processItem(item.json);
    results.push({ json: result });
    successCount++;
  } catch (error) {
    results.push({
      json: {
        ...item.json,
        error: error.message,
        failed: true
      }
    });
    errorCount++;
  }
}

const endTime = Date.now();
const duration = endTime - startTime;

console.log(`Batch completed: ${batchInfo.batchId}`, {
  duration: `${duration}ms`,
  successCount,
  errorCount,
  totalItems: items.length
});

return results;

小结

循环和批处理是处理大量数据的核心技术:

  1. 合理设置批次大小:平衡性能和稳定性
  2. 添加错误处理:确保部分失败不影响整体
  3. 实现重试机制:提高处理成功率
  4. 监控处理进度:便于跟踪和调试
  5. 优化内存使用:避免内存溢出

下一篇文章,我们将学习错误处理和重试机制的更多细节,这对构建稳定的生产系统非常重要。

记住,好的批处理设计要考虑到各种异常情况,包括网络中断、API 限制、数据格式错误等。充分的错误处理和监控是成功的关键。