n8n 循环与批处理 - 高效处理大量数据
在实际应用中,我们经常需要处理大量数据,比如批量导入用户信息、处理大量订单、或者调用有频率限制的 API。今天我们来学习如何在 n8n 中实现高效的循环和批处理。
批处理基础概念
为什么需要批处理
API 限制
- 大多数 API 都有频率限制
- 单次请求的数据量限制
- 避免超时和连接问题
性能考虑
- 减少内存占用
- 提高处理效率
- 避免系统过载
错误恢复
- 部分失败不影响整体
- 便于重试和错误定位
- 提高系统稳定性
Split In Batches 节点详解
Split In Batches 是 n8n 中最重要的批处理节点。
基本配置
json
{
"batchSize": 10,
"options": {
"reset": false
}
}
工作原理
输入数据 → Split In Batches → 处理逻辑 → 循环回 Split In Batches
↓ ↑
100条记录 处理完成后循环
↓
第1批(10条) → 处理 → 第2批(10条) → 处理 → ... → 第10批(10条) → 处理 → 结束
实际应用案例
案例1:批量发送邮件
javascript
// 准备邮件数据
const emailList = [
{ email: 'user1@example.com', name: '张三' },
{ email: 'user2@example.com', name: '李四' },
// ... 更多用户
];
// Split In Batches 配置
{
"batchSize": 5, // 每批处理5个邮件
"options": {
"reset": false
}
}
案例2:API 数据同步
javascript
// 处理大量订单数据
const orders = items; // 假设有1000个订单
// 在 Code 节点中处理每批数据
const batch = items;
const processedOrders = [];
for (const item of batch) {
const order = item.json;
try {
// 调用外部 API 更新订单状态
const response = await fetch(`https://api.example.com/orders/${order.id}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.API_TOKEN}`
},
body: JSON.stringify({
status: 'processed',
updatedAt: new Date().toISOString()
})
});
if (response.ok) {
processedOrders.push({
json: {
...order,
status: 'updated',
apiResponse: await response.json()
}
});
} else {
processedOrders.push({
json: {
...order,
status: 'failed',
error: `API error: ${response.status}`
}
});
}
} catch (error) {
processedOrders.push({
json: {
...order,
status: 'error',
error: error.message
}
});
}
}
return processedOrders;
高级批处理模式
动态批次大小
javascript
// 根据数据类型动态调整批次大小
const items = $input.all();
let batchSize = 10; // 默认批次大小
// 根据数据复杂度调整
if (items[0]?.json?.type === 'complex') {
batchSize = 5; // 复杂数据减少批次大小
} else if (items[0]?.json?.type === 'simple') {
batchSize = 20; // 简单数据增加批次大小
}
// 根据系统负载调整
const currentHour = new Date().getHours();
if (currentHour >= 9 && currentHour <= 17) {
batchSize = Math.floor(batchSize * 0.7); // 工作时间减少负载
}
return [{
json: {
recommendedBatchSize: batchSize,
totalItems: items.length,
estimatedBatches: Math.ceil(items.length / batchSize)
}
}];
带延迟的批处理
javascript
// 在批处理间添加延迟
const DELAY_MS = 1000; // 1秒延迟
// 处理当前批次
const currentBatch = items;
const results = [];
for (const item of currentBatch) {
// 处理单个项目
const result = await processItem(item.json);
results.push({ json: result });
// 在项目间添加小延迟
await new Promise(resolve => setTimeout(resolve, 100));
}
// 批次间延迟
if ($node.context.batchIndex > 0) {
await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}
return results;
错误处理与重试
javascript
// 带重试机制的批处理
async function processWithRetry(item, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const result = await processItem(item);
return {
json: {
...item,
result,
success: true,
attempts: attempt
}
};
} catch (error) {
if (attempt === maxRetries) {
return {
json: {
...item,
error: error.message,
success: false,
attempts: attempt
}
};
}
// 指数退避延迟
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
const results = [];
for (const item of items) {
const result = await processWithRetry(item.json);
results.push(result);
}
return results;
循环控制技巧
条件循环
javascript
// 基于条件的循环控制
const data = items[0].json;
const shouldContinue = data.hasMore && data.page < data.maxPages;
return [{
json: {
...data,
shouldContinue,
nextPage: shouldContinue ? data.page + 1 : null
}
}];
循环计数器
javascript
// 使用工作流变量作为计数器
const currentCount = $workflow.context.loopCount || 0;
const maxIterations = 100;
if (currentCount >= maxIterations) {
throw new Error('Maximum iterations reached');
}
// 更新计数器
$workflow.context.loopCount = currentCount + 1;
return [{
json: {
iteration: currentCount + 1,
remaining: maxIterations - currentCount - 1,
data: items[0].json
}
}];
分页数据处理
javascript
// 处理分页 API 数据
async function fetchAllPages(baseUrl, params = {}) {
const allData = [];
let page = 1;
let hasMore = true;
while (hasMore) {
try {
const response = await fetch(`${baseUrl}?page=${page}&limit=50`, {
headers: {
'Authorization': `Bearer ${params.token}`
}
});
const data = await response.json();
allData.push(...data.items);
hasMore = data.hasNext;
page++;
// 避免无限循环
if (page > 1000) {
throw new Error('Too many pages, possible infinite loop');
}
// API 限制延迟
await new Promise(resolve => setTimeout(resolve, 200));
} catch (error) {
console.error(`Error fetching page ${page}:`, error);
break;
}
}
return allData;
}
const allItems = await fetchAllPages('https://api.example.com/items', {
token: process.env.API_TOKEN
});
return allItems.map(item => ({ json: item }));
性能优化策略
内存管理
javascript
// 流式处理大数据
function* processLargeDataset(items) {
const CHUNK_SIZE = 100;
for (let i = 0; i < items.length; i += CHUNK_SIZE) {
const chunk = items.slice(i, i + CHUNK_SIZE);
// 处理当前块
const processed = chunk.map(item => ({
json: {
...item.json,
processed: true,
chunkIndex: Math.floor(i / CHUNK_SIZE)
}
}));
yield processed;
// 强制垃圾回收(如果可用)
if (global.gc) {
global.gc();
}
}
}
// 使用生成器处理数据
const generator = processLargeDataset(items);
const currentChunk = generator.next().value;
return currentChunk || [];
并行处理
javascript
// 并行处理多个批次
async function processBatchesInParallel(batches, concurrency = 3) {
const results = [];
for (let i = 0; i < batches.length; i += concurrency) {
const currentBatches = batches.slice(i, i + concurrency);
const promises = currentBatches.map(async (batch, index) => {
try {
return await processBatch(batch);
} catch (error) {
return {
error: error.message,
batchIndex: i + index
};
}
});
const batchResults = await Promise.all(promises);
results.push(...batchResults);
}
return results;
}
// 将数据分成批次
const BATCH_SIZE = 10;
const batches = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) {
batches.push(items.slice(i, i + BATCH_SIZE));
}
const results = await processBatchesInParallel(batches);
return results.map(result => ({ json: result }));
进度跟踪
javascript
// 添加进度跟踪
const totalItems = $workflow.context.totalItems || items.length;
const processedItems = $workflow.context.processedItems || 0;
const currentBatchSize = items.length;
const newProcessedCount = processedItems + currentBatchSize;
const progress = Math.round((newProcessedCount / totalItems) * 100);
// 更新进度
$workflow.context.processedItems = newProcessedCount;
// 发送进度通知(每10%发送一次)
if (progress % 10 === 0 && progress > ($workflow.context.lastNotifiedProgress || 0)) {
$workflow.context.lastNotifiedProgress = progress;
// 这里可以发送通知
console.log(`Processing progress: ${progress}% (${newProcessedCount}/${totalItems})`);
}
return items.map(item => ({
json: {
...item.json,
progress: {
current: newProcessedCount,
total: totalItems,
percentage: progress
}
}
}));
错误恢复机制
断点续传
javascript
// 实现断点续传功能
const CHECKPOINT_KEY = 'batch_processing_checkpoint';
const checkpoint = $workflow.context[CHECKPOINT_KEY] || { lastProcessedIndex: 0 };
const startIndex = checkpoint.lastProcessedIndex;
const remainingItems = items.slice(startIndex);
if (remainingItems.length === 0) {
return [{ json: { message: 'All items processed', totalProcessed: startIndex } }];
}
// 处理当前批次
const BATCH_SIZE = 10;
const currentBatch = remainingItems.slice(0, BATCH_SIZE);
const results = [];
for (let i = 0; i < currentBatch.length; i++) {
try {
const result = await processItem(currentBatch[i].json);
results.push({ json: result });
// 更新检查点
checkpoint.lastProcessedIndex = startIndex + i + 1;
$workflow.context[CHECKPOINT_KEY] = checkpoint;
} catch (error) {
// 记录错误但继续处理
results.push({
json: {
...currentBatch[i].json,
error: error.message,
failed: true
}
});
}
}
return results;
失败重试队列
javascript
// 管理失败项目的重试队列
const RETRY_QUEUE_KEY = 'retry_queue';
const MAX_RETRIES = 3;
let retryQueue = $workflow.context[RETRY_QUEUE_KEY] || [];
const results = [];
for (const item of items) {
const data = item.json;
const retryCount = data.retryCount || 0;
try {
const result = await processItem(data);
results.push({ json: result });
} catch (error) {
if (retryCount < MAX_RETRIES) {
// 添加到重试队列
retryQueue.push({
...data,
retryCount: retryCount + 1,
lastError: error.message,
retryAt: Date.now() + (Math.pow(2, retryCount) * 60000) // 指数退避
});
} else {
// 超过最大重试次数,标记为永久失败
results.push({
json: {
...data,
permanentFailure: true,
finalError: error.message
}
});
}
}
}
// 更新重试队列
$workflow.context[RETRY_QUEUE_KEY] = retryQueue;
return results;
监控和日志
批处理监控
javascript
// 添加详细的监控信息
const startTime = Date.now();
const batchInfo = {
batchId: `batch_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
startTime: new Date().toISOString(),
itemCount: items.length
};
console.log(`Starting batch processing: ${batchInfo.batchId}`);
const results = [];
let successCount = 0;
let errorCount = 0;
for (const item of items) {
try {
const result = await processItem(item.json);
results.push({ json: result });
successCount++;
} catch (error) {
results.push({
json: {
...item.json,
error: error.message,
failed: true
}
});
errorCount++;
}
}
const endTime = Date.now();
const duration = endTime - startTime;
console.log(`Batch completed: ${batchInfo.batchId}`, {
duration: `${duration}ms`,
successCount,
errorCount,
totalItems: items.length
});
return results;
小结
循环和批处理是处理大量数据的核心技术:
- 合理设置批次大小:平衡性能和稳定性
- 添加错误处理:确保部分失败不影响整体
- 实现重试机制:提高处理成功率
- 监控处理进度:便于跟踪和调试
- 优化内存使用:避免内存溢出
下一篇文章,我们将学习错误处理和重试机制的更多细节,这对构建稳定的生产系统非常重要。
记住,好的批处理设计要考虑到各种异常情况,包括网络中断、API 限制、数据格式错误等。充分的错误处理和监控是成功的关键。