Skip to content

n8n 定时任务与调度系统 - 自动化的时间管理

定时任务是自动化系统的重要组成部分,它让工作流能够在特定时间自动执行。今天我们来深入学习 n8n 的定时任务和调度系统。

定时触发器类型

Cron Trigger

最强大和灵活的定时触发器,基于 Unix Cron 表达式。

基本配置

json
{
  "cronExpression": "0 9 * * 1-5",
  "timezone": "Asia/Shanghai"
}

Interval Trigger

简单的间隔触发器,适合规律性重复任务。

基本配置

json
{
  "interval": 300,
  "unit": "seconds"
}

Schedule Trigger

可视化的调度配置,适合非技术用户。

基本配置

json
{
  "rule": {
    "interval": [{
      "field": "hour",
      "value": 9
    }]
  }
}

Cron 表达式详解

表达式格式

┌───────────── 分钟 (0 - 59)
│ ┌─────────── 小时 (0 - 23)
│ │ ┌───────── 日期 (1 - 31)
│ │ │ ┌─────── 月份 (1 - 12)
│ │ │ │ ┌───── 星期 (0 - 7, 0和7都表示周日)
│ │ │ │ │
* * * * *

特殊字符

星号 (*):匹配所有值

bash
* * * * *    # 每分钟执行

逗号 (,):列举多个值

bash
0 9,12,18 * * *    # 每天9点、12点、18点执行

连字符 (-):指定范围

bash
0 9-17 * * 1-5    # 工作日9点到17点每小时执行

斜杠 (/):指定间隔

bash
*/15 * * * *      # 每15分钟执行
0 */2 * * *       # 每2小时执行

问号 (?):用于日期和星期字段,表示不指定值

bash
0 9 ? * MON       # 每周一9点执行(日期不指定)

常用 Cron 表达式

每日任务

bash
0 9 * * *         # 每天早上9点
0 0 * * *         # 每天午夜
30 23 * * *       # 每天晚上11:30

工作日任务

bash
0 9 * * 1-5       # 工作日早上9点
0 18 * * MON-FRI  # 工作日下午6点

每周任务

bash
0 9 * * 1         # 每周一早上9点
0 0 * * SUN       # 每周日午夜

每月任务

bash
0 9 1 * *         # 每月1号早上9点
0 9 L * *         # 每月最后一天早上9点
0 9 1-7 * 1       # 每月第一个周一早上9点

复杂调度

bash
0 9-17/2 * * 1-5  # 工作日9-17点每2小时
*/30 9-17 * * 1-5 # 工作日9-17点每30分钟
0 9 1,15 * *      # 每月1号和15号早上9点

高级调度策略

动态调度

javascript
// 根据业务需求动态调整调度时间
function generateCronExpression(config) {
  const { frequency, workingHours, timezone, businessDays } = config;
  
  let cronExpr = '';
  
  switch (frequency) {
    case 'hourly':
      cronExpr = workingHours 
        ? `0 ${workingHours.start}-${workingHours.end} * * ${businessDays}`
        : '0 * * * *';
      break;
      
    case 'daily':
      cronExpr = businessDays 
        ? `0 9 * * ${businessDays}`
        : '0 9 * * *';
      break;
      
    case 'weekly':
      cronExpr = '0 9 * * 1'; // 每周一
      break;
      
    case 'monthly':
      cronExpr = '0 9 1 * *'; // 每月1号
      break;
  }
  
  return cronExpr;
}

// 使用示例
const scheduleConfig = {
  frequency: 'hourly',
  workingHours: { start: 9, end: 17 },
  businessDays: '1-5',
  timezone: 'Asia/Shanghai'
};

const cronExpression = generateCronExpression(scheduleConfig);
console.log(`Generated cron: ${cronExpression}`);

条件调度

javascript
// 基于条件的智能调度
function shouldExecuteNow(conditions) {
  const now = new Date();
  const hour = now.getHours();
  const day = now.getDay();
  
  // 检查时间窗口
  if (conditions.timeWindow) {
    const { start, end } = conditions.timeWindow;
    if (hour < start || hour > end) {
      return false;
    }
  }
  
  // 检查工作日
  if (conditions.businessDaysOnly && (day === 0 || day === 6)) {
    return false;
  }
  
  // 检查节假日
  if (conditions.excludeHolidays && isHoliday(now)) {
    return false;
  }
  
  // 检查系统负载
  if (conditions.checkSystemLoad && getSystemLoad() > 0.8) {
    return false;
  }
  
  return true;
}

// 在工作流开始时检查条件
const executionConditions = {
  timeWindow: { start: 9, end: 17 },
  businessDaysOnly: true,
  excludeHolidays: true,
  checkSystemLoad: true
};

if (!shouldExecuteNow(executionConditions)) {
  console.log('Conditions not met, skipping execution');
  return [{ json: { skipped: true, reason: 'Conditions not met' } }];
}

分布式调度

javascript
// 避免多个实例同时执行
async function acquireLock(lockKey, ttl = 300000) {
  const lockValue = `${Date.now()}_${Math.random()}`;
  
  try {
    // 使用 Redis 或数据库实现分布式锁
    const acquired = await setLockIfNotExists(lockKey, lockValue, ttl);
    
    if (acquired) {
      return {
        acquired: true,
        lockValue,
        release: () => releaseLock(lockKey, lockValue)
      };
    }
    
    return { acquired: false };
  } catch (error) {
    console.error('Failed to acquire lock:', error);
    return { acquired: false };
  }
}

// 在定时任务开始时获取锁
const lockKey = `workflow_${$workflow.id}_${$node.name}`;
const lock = await acquireLock(lockKey);

if (!lock.acquired) {
  console.log('Another instance is running, skipping');
  return [{ json: { skipped: true, reason: 'Lock not acquired' } }];
}

try {
  // 执行实际任务
  const result = await executeTask();
  
  return [{ json: { success: true, result } }];
} finally {
  // 释放锁
  if (lock.release) {
    await lock.release();
  }
}

时区处理

时区配置

javascript
// 处理不同时区的调度
function convertToTimezone(cronExpr, fromTz, toTz) {
  // 解析 cron 表达式
  const parts = cronExpr.split(' ');
  const hour = parseInt(parts[1]);
  
  // 计算时区偏移
  const fromOffset = getTimezoneOffset(fromTz);
  const toOffset = getTimezoneOffset(toTz);
  const hourDiff = (toOffset - fromOffset) / 60;
  
  // 调整小时
  let newHour = hour + hourDiff;
  let dayAdjustment = 0;
  
  if (newHour < 0) {
    newHour += 24;
    dayAdjustment = -1;
  } else if (newHour >= 24) {
    newHour -= 24;
    dayAdjustment = 1;
  }
  
  parts[1] = newHour.toString();
  
  // 如果需要调整日期
  if (dayAdjustment !== 0) {
    // 这里需要更复杂的逻辑来处理星期和日期的调整
    adjustDayFields(parts, dayAdjustment);
  }
  
  return parts.join(' ');
}

// 多时区调度示例
const schedules = [
  { timezone: 'America/New_York', cron: '0 9 * * 1-5' },
  { timezone: 'Europe/London', cron: '0 14 * * 1-5' },
  { timezone: 'Asia/Shanghai', cron: '0 22 * * 1-5' }
];

// 转换为 UTC 时间
const utcSchedules = schedules.map(schedule => ({
  ...schedule,
  utcCron: convertToTimezone(schedule.cron, schedule.timezone, 'UTC')
}));

夏令时处理

javascript
// 处理夏令时变化
function adjustForDST(cronExpr, timezone) {
  const now = new Date();
  const isDST = isDaylightSavingTime(now, timezone);
  const willBeDST = isDaylightSavingTime(getNextExecution(cronExpr), timezone);
  
  if (isDST !== willBeDST) {
    // 夏令时状态将发生变化,需要调整时间
    const adjustment = willBeDST ? -1 : 1; // 夏令时开始时减1小时,结束时加1小时
    
    const parts = cronExpr.split(' ');
    const hour = parseInt(parts[1]) + adjustment;
    
    if (hour >= 0 && hour < 24) {
      parts[1] = hour.toString();
      return parts.join(' ');
    }
  }
  
  return cronExpr;
}

任务监控和管理

执行历史跟踪

javascript
// 记录任务执行历史
function recordExecution(taskId, status, duration, result) {
  const execution = {
    taskId,
    executionId: $execution.id,
    timestamp: new Date().toISOString(),
    status, // 'success', 'failed', 'skipped'
    duration,
    result: status === 'failed' ? result.error : result,
    nodeId: $node.id,
    workflowId: $workflow.id
  };
  
  // 存储到数据库或日志系统
  saveExecutionRecord(execution);
  
  // 更新统计信息
  updateTaskStatistics(taskId, status, duration);
  
  return execution;
}

// 在任务开始时记录
const startTime = Date.now();
const taskId = `${$workflow.name}_${$node.name}`;

try {
  const result = await executeScheduledTask();
  
  const duration = Date.now() - startTime;
  recordExecution(taskId, 'success', duration, result);
  
  return [{ json: { success: true, result, duration } }];
} catch (error) {
  const duration = Date.now() - startTime;
  recordExecution(taskId, 'failed', duration, { error: error.message });
  
  throw error;
}

性能监控

javascript
// 监控任务性能
function monitorTaskPerformance(taskId) {
  const stats = getTaskStatistics(taskId);
  const thresholds = {
    maxDuration: 300000, // 5分钟
    maxFailureRate: 0.1, // 10%
    minSuccessRate: 0.9  // 90%
  };
  
  const alerts = [];
  
  // 检查执行时间
  if (stats.avgDuration > thresholds.maxDuration) {
    alerts.push({
      type: 'performance',
      message: `Task ${taskId} average duration (${stats.avgDuration}ms) exceeds threshold`,
      severity: 'warning'
    });
  }
  
  // 检查失败率
  if (stats.failureRate > thresholds.maxFailureRate) {
    alerts.push({
      type: 'reliability',
      message: `Task ${taskId} failure rate (${stats.failureRate * 100}%) exceeds threshold`,
      severity: 'critical'
    });
  }
  
  // 检查成功率
  if (stats.successRate < thresholds.minSuccessRate) {
    alerts.push({
      type: 'reliability',
      message: `Task ${taskId} success rate (${stats.successRate * 100}%) below threshold`,
      severity: 'warning'
    });
  }
  
  return {
    taskId,
    statistics: stats,
    alerts,
    healthy: alerts.length === 0
  };
}

自动调整

javascript
// 基于性能自动调整调度频率
function autoAdjustSchedule(taskId, currentCron) {
  const stats = getTaskStatistics(taskId);
  const performance = monitorTaskPerformance(taskId);
  
  if (!performance.healthy) {
    // 如果任务不健康,降低执行频率
    return reduceFrequency(currentCron);
  }
  
  // 如果任务执行很快且成功率高,可以增加频率
  if (stats.avgDuration < 10000 && stats.successRate > 0.95) {
    return increaseFrequency(currentCron);
  }
  
  return currentCron;
}

function reduceFrequency(cronExpr) {
  // 简单的频率降低逻辑
  const parts = cronExpr.split(' ');
  
  if (parts[0] === '*') {
    parts[0] = '*/5'; // 从每分钟改为每5分钟
  } else if (parts[0] === '*/5') {
    parts[0] = '*/15'; // 从每5分钟改为每15分钟
  } else if (parts[0] === '*/15') {
    parts[0] = '*/30'; // 从每15分钟改为每30分钟
  }
  
  return parts.join(' ');
}

function increaseFrequency(cronExpr) {
  // 简单的频率增加逻辑
  const parts = cronExpr.split(' ');
  
  if (parts[0] === '*/30') {
    parts[0] = '*/15'; // 从每30分钟改为每15分钟
  } else if (parts[0] === '*/15') {
    parts[0] = '*/5'; // 从每15分钟改为每5分钟
  } else if (parts[0] === '*/5') {
    parts[0] = '*'; // 从每5分钟改为每分钟
  }
  
  return parts.join(' ');
}

实际应用案例

数据同步任务

javascript
// 每小时同步数据
// Cron: 0 * * * *

const syncConfig = {
  source: 'external_api',
  target: 'local_database',
  batchSize: 100,
  timeout: 300000
};

async function syncData(config) {
  const startTime = Date.now();
  let totalSynced = 0;
  let errors = [];
  
  try {
    // 获取上次同步时间
    const lastSync = await getLastSyncTime(config.source);
    
    // 获取增量数据
    const data = await fetchIncrementalData(config.source, lastSync);
    
    // 分批同步
    for (let i = 0; i < data.length; i += config.batchSize) {
      const batch = data.slice(i, i + config.batchSize);
      
      try {
        await syncBatch(batch, config.target);
        totalSynced += batch.length;
      } catch (error) {
        errors.push({
          batch: i / config.batchSize + 1,
          error: error.message,
          items: batch.length
        });
      }
    }
    
    // 更新同步时间
    await updateLastSyncTime(config.source, new Date());
    
    return {
      success: true,
      duration: Date.now() - startTime,
      totalItems: data.length,
      syncedItems: totalSynced,
      errors: errors
    };
  } catch (error) {
    return {
      success: false,
      duration: Date.now() - startTime,
      error: error.message
    };
  }
}

const result = await syncData(syncConfig);
return [{ json: result }];

报告生成任务

javascript
// 每天早上8点生成日报
// Cron: 0 8 * * *

async function generateDailyReport() {
  const yesterday = new Date();
  yesterday.setDate(yesterday.getDate() - 1);
  
  const reportData = {
    date: yesterday.toISOString().split('T')[0],
    metrics: {},
    charts: [],
    summary: {}
  };
  
  try {
    // 收集各种指标
    reportData.metrics.orders = await getOrderMetrics(yesterday);
    reportData.metrics.users = await getUserMetrics(yesterday);
    reportData.metrics.revenue = await getRevenueMetrics(yesterday);
    
    // 生成图表
    reportData.charts.push(await generateOrderChart(yesterday));
    reportData.charts.push(await generateRevenueChart(yesterday));
    
    // 生成摘要
    reportData.summary = generateSummary(reportData.metrics);
    
    // 生成 PDF 报告
    const pdfBuffer = await generatePDF(reportData);
    
    // 发送邮件
    await sendReportEmail(pdfBuffer, reportData.date);
    
    return {
      success: true,
      reportDate: reportData.date,
      metrics: reportData.metrics,
      summary: reportData.summary
    };
  } catch (error) {
    return {
      success: false,
      error: error.message,
      reportDate: reportData.date
    };
  }
}

const result = await generateDailyReport();
return [{ json: result }];

系统维护任务

javascript
// 每周日凌晨2点执行系统维护
// Cron: 0 2 * * 0

async function performMaintenance() {
  const maintenanceTasks = [
    { name: 'cleanup_logs', function: cleanupOldLogs },
    { name: 'optimize_database', function: optimizeDatabase },
    { name: 'backup_data', function: backupCriticalData },
    { name: 'update_cache', function: refreshCache },
    { name: 'check_disk_space', function: checkDiskSpace }
  ];
  
  const results = [];
  
  for (const task of maintenanceTasks) {
    const startTime = Date.now();
    
    try {
      console.log(`Starting maintenance task: ${task.name}`);
      const result = await task.function();
      
      results.push({
        task: task.name,
        status: 'success',
        duration: Date.now() - startTime,
        result
      });
    } catch (error) {
      results.push({
        task: task.name,
        status: 'failed',
        duration: Date.now() - startTime,
        error: error.message
      });
    }
  }
  
  // 生成维护报告
  const report = {
    timestamp: new Date().toISOString(),
    totalTasks: maintenanceTasks.length,
    successfulTasks: results.filter(r => r.status === 'success').length,
    failedTasks: results.filter(r => r.status === 'failed').length,
    totalDuration: results.reduce((sum, r) => sum + r.duration, 0),
    details: results
  };
  
  // 发送维护报告
  if (report.failedTasks > 0) {
    await sendMaintenanceAlert(report);
  }
  
  return report;
}

const maintenanceResult = await performMaintenance();
return [{ json: maintenanceResult }];

小结

定时任务和调度系统是自动化的重要基础:

  1. 掌握 Cron 表达式:理解各种时间模式的表示方法
  2. 处理时区问题:确保任务在正确的时间执行
  3. 实现监控机制:跟踪任务执行状态和性能
  4. 添加错误处理:确保任务失败时能够恢复
  5. 优化调度策略:根据实际需求调整执行频率

下一篇文章,我们将学习 Webhook 和 API 集成,这是连接外部系统的重要技术。

记住,好的调度系统不仅要能按时执行任务,还要能监控任务状态、处理异常情况,并根据实际情况自动调整。这样才能构建真正可靠的自动化系统。