n8n 定时任务与调度系统 - 自动化的时间管理
定时任务是自动化系统的重要组成部分,它让工作流能够在特定时间自动执行。今天我们来深入学习 n8n 的定时任务和调度系统。
定时触发器类型
Cron Trigger
最强大和灵活的定时触发器,基于 Unix Cron 表达式。
基本配置
json
{
"cronExpression": "0 9 * * 1-5",
"timezone": "Asia/Shanghai"
}
Interval Trigger
简单的间隔触发器,适合规律性重复任务。
基本配置
json
{
"interval": 300,
"unit": "seconds"
}
Schedule Trigger
可视化的调度配置,适合非技术用户。
基本配置
json
{
"rule": {
"interval": [{
"field": "hour",
"value": 9
}]
}
}
Cron 表达式详解
表达式格式
┌───────────── 分钟 (0 - 59)
│ ┌─────────── 小时 (0 - 23)
│ │ ┌───────── 日期 (1 - 31)
│ │ │ ┌─────── 月份 (1 - 12)
│ │ │ │ ┌───── 星期 (0 - 7, 0和7都表示周日)
│ │ │ │ │
* * * * *
特殊字符
星号 (*):匹配所有值
bash
* * * * * # 每分钟执行
逗号 (,):列举多个值
bash
0 9,12,18 * * * # 每天9点、12点、18点执行
连字符 (-):指定范围
bash
0 9-17 * * 1-5 # 工作日9点到17点每小时执行
斜杠 (/):指定间隔
bash
*/15 * * * * # 每15分钟执行
0 */2 * * * # 每2小时执行
问号 (?):用于日期和星期字段,表示不指定值
bash
0 9 ? * MON # 每周一9点执行(日期不指定)
常用 Cron 表达式
每日任务
bash
0 9 * * * # 每天早上9点
0 0 * * * # 每天午夜
30 23 * * * # 每天晚上11:30
工作日任务
bash
0 9 * * 1-5 # 工作日早上9点
0 18 * * MON-FRI # 工作日下午6点
每周任务
bash
0 9 * * 1 # 每周一早上9点
0 0 * * SUN # 每周日午夜
每月任务
bash
0 9 1 * * # 每月1号早上9点
0 9 L * * # 每月最后一天早上9点
0 9 1-7 * 1 # 每月第一个周一早上9点
复杂调度
bash
0 9-17/2 * * 1-5 # 工作日9-17点每2小时
*/30 9-17 * * 1-5 # 工作日9-17点每30分钟
0 9 1,15 * * # 每月1号和15号早上9点
高级调度策略
动态调度
javascript
// 根据业务需求动态调整调度时间
function generateCronExpression(config) {
const { frequency, workingHours, timezone, businessDays } = config;
let cronExpr = '';
switch (frequency) {
case 'hourly':
cronExpr = workingHours
? `0 ${workingHours.start}-${workingHours.end} * * ${businessDays}`
: '0 * * * *';
break;
case 'daily':
cronExpr = businessDays
? `0 9 * * ${businessDays}`
: '0 9 * * *';
break;
case 'weekly':
cronExpr = '0 9 * * 1'; // 每周一
break;
case 'monthly':
cronExpr = '0 9 1 * *'; // 每月1号
break;
}
return cronExpr;
}
// 使用示例
const scheduleConfig = {
frequency: 'hourly',
workingHours: { start: 9, end: 17 },
businessDays: '1-5',
timezone: 'Asia/Shanghai'
};
const cronExpression = generateCronExpression(scheduleConfig);
console.log(`Generated cron: ${cronExpression}`);
条件调度
javascript
// 基于条件的智能调度
function shouldExecuteNow(conditions) {
const now = new Date();
const hour = now.getHours();
const day = now.getDay();
// 检查时间窗口
if (conditions.timeWindow) {
const { start, end } = conditions.timeWindow;
if (hour < start || hour > end) {
return false;
}
}
// 检查工作日
if (conditions.businessDaysOnly && (day === 0 || day === 6)) {
return false;
}
// 检查节假日
if (conditions.excludeHolidays && isHoliday(now)) {
return false;
}
// 检查系统负载
if (conditions.checkSystemLoad && getSystemLoad() > 0.8) {
return false;
}
return true;
}
// 在工作流开始时检查条件
const executionConditions = {
timeWindow: { start: 9, end: 17 },
businessDaysOnly: true,
excludeHolidays: true,
checkSystemLoad: true
};
if (!shouldExecuteNow(executionConditions)) {
console.log('Conditions not met, skipping execution');
return [{ json: { skipped: true, reason: 'Conditions not met' } }];
}
分布式调度
javascript
// 避免多个实例同时执行
async function acquireLock(lockKey, ttl = 300000) {
const lockValue = `${Date.now()}_${Math.random()}`;
try {
// 使用 Redis 或数据库实现分布式锁
const acquired = await setLockIfNotExists(lockKey, lockValue, ttl);
if (acquired) {
return {
acquired: true,
lockValue,
release: () => releaseLock(lockKey, lockValue)
};
}
return { acquired: false };
} catch (error) {
console.error('Failed to acquire lock:', error);
return { acquired: false };
}
}
// 在定时任务开始时获取锁
const lockKey = `workflow_${$workflow.id}_${$node.name}`;
const lock = await acquireLock(lockKey);
if (!lock.acquired) {
console.log('Another instance is running, skipping');
return [{ json: { skipped: true, reason: 'Lock not acquired' } }];
}
try {
// 执行实际任务
const result = await executeTask();
return [{ json: { success: true, result } }];
} finally {
// 释放锁
if (lock.release) {
await lock.release();
}
}
时区处理
时区配置
javascript
// 处理不同时区的调度
function convertToTimezone(cronExpr, fromTz, toTz) {
// 解析 cron 表达式
const parts = cronExpr.split(' ');
const hour = parseInt(parts[1]);
// 计算时区偏移
const fromOffset = getTimezoneOffset(fromTz);
const toOffset = getTimezoneOffset(toTz);
const hourDiff = (toOffset - fromOffset) / 60;
// 调整小时
let newHour = hour + hourDiff;
let dayAdjustment = 0;
if (newHour < 0) {
newHour += 24;
dayAdjustment = -1;
} else if (newHour >= 24) {
newHour -= 24;
dayAdjustment = 1;
}
parts[1] = newHour.toString();
// 如果需要调整日期
if (dayAdjustment !== 0) {
// 这里需要更复杂的逻辑来处理星期和日期的调整
adjustDayFields(parts, dayAdjustment);
}
return parts.join(' ');
}
// 多时区调度示例
const schedules = [
{ timezone: 'America/New_York', cron: '0 9 * * 1-5' },
{ timezone: 'Europe/London', cron: '0 14 * * 1-5' },
{ timezone: 'Asia/Shanghai', cron: '0 22 * * 1-5' }
];
// 转换为 UTC 时间
const utcSchedules = schedules.map(schedule => ({
...schedule,
utcCron: convertToTimezone(schedule.cron, schedule.timezone, 'UTC')
}));
夏令时处理
javascript
// 处理夏令时变化
function adjustForDST(cronExpr, timezone) {
const now = new Date();
const isDST = isDaylightSavingTime(now, timezone);
const willBeDST = isDaylightSavingTime(getNextExecution(cronExpr), timezone);
if (isDST !== willBeDST) {
// 夏令时状态将发生变化,需要调整时间
const adjustment = willBeDST ? -1 : 1; // 夏令时开始时减1小时,结束时加1小时
const parts = cronExpr.split(' ');
const hour = parseInt(parts[1]) + adjustment;
if (hour >= 0 && hour < 24) {
parts[1] = hour.toString();
return parts.join(' ');
}
}
return cronExpr;
}
任务监控和管理
执行历史跟踪
javascript
// 记录任务执行历史
function recordExecution(taskId, status, duration, result) {
const execution = {
taskId,
executionId: $execution.id,
timestamp: new Date().toISOString(),
status, // 'success', 'failed', 'skipped'
duration,
result: status === 'failed' ? result.error : result,
nodeId: $node.id,
workflowId: $workflow.id
};
// 存储到数据库或日志系统
saveExecutionRecord(execution);
// 更新统计信息
updateTaskStatistics(taskId, status, duration);
return execution;
}
// 在任务开始时记录
const startTime = Date.now();
const taskId = `${$workflow.name}_${$node.name}`;
try {
const result = await executeScheduledTask();
const duration = Date.now() - startTime;
recordExecution(taskId, 'success', duration, result);
return [{ json: { success: true, result, duration } }];
} catch (error) {
const duration = Date.now() - startTime;
recordExecution(taskId, 'failed', duration, { error: error.message });
throw error;
}
性能监控
javascript
// 监控任务性能
function monitorTaskPerformance(taskId) {
const stats = getTaskStatistics(taskId);
const thresholds = {
maxDuration: 300000, // 5分钟
maxFailureRate: 0.1, // 10%
minSuccessRate: 0.9 // 90%
};
const alerts = [];
// 检查执行时间
if (stats.avgDuration > thresholds.maxDuration) {
alerts.push({
type: 'performance',
message: `Task ${taskId} average duration (${stats.avgDuration}ms) exceeds threshold`,
severity: 'warning'
});
}
// 检查失败率
if (stats.failureRate > thresholds.maxFailureRate) {
alerts.push({
type: 'reliability',
message: `Task ${taskId} failure rate (${stats.failureRate * 100}%) exceeds threshold`,
severity: 'critical'
});
}
// 检查成功率
if (stats.successRate < thresholds.minSuccessRate) {
alerts.push({
type: 'reliability',
message: `Task ${taskId} success rate (${stats.successRate * 100}%) below threshold`,
severity: 'warning'
});
}
return {
taskId,
statistics: stats,
alerts,
healthy: alerts.length === 0
};
}
自动调整
javascript
// 基于性能自动调整调度频率
function autoAdjustSchedule(taskId, currentCron) {
const stats = getTaskStatistics(taskId);
const performance = monitorTaskPerformance(taskId);
if (!performance.healthy) {
// 如果任务不健康,降低执行频率
return reduceFrequency(currentCron);
}
// 如果任务执行很快且成功率高,可以增加频率
if (stats.avgDuration < 10000 && stats.successRate > 0.95) {
return increaseFrequency(currentCron);
}
return currentCron;
}
function reduceFrequency(cronExpr) {
// 简单的频率降低逻辑
const parts = cronExpr.split(' ');
if (parts[0] === '*') {
parts[0] = '*/5'; // 从每分钟改为每5分钟
} else if (parts[0] === '*/5') {
parts[0] = '*/15'; // 从每5分钟改为每15分钟
} else if (parts[0] === '*/15') {
parts[0] = '*/30'; // 从每15分钟改为每30分钟
}
return parts.join(' ');
}
function increaseFrequency(cronExpr) {
// 简单的频率增加逻辑
const parts = cronExpr.split(' ');
if (parts[0] === '*/30') {
parts[0] = '*/15'; // 从每30分钟改为每15分钟
} else if (parts[0] === '*/15') {
parts[0] = '*/5'; // 从每15分钟改为每5分钟
} else if (parts[0] === '*/5') {
parts[0] = '*'; // 从每5分钟改为每分钟
}
return parts.join(' ');
}
实际应用案例
数据同步任务
javascript
// 每小时同步数据
// Cron: 0 * * * *
const syncConfig = {
source: 'external_api',
target: 'local_database',
batchSize: 100,
timeout: 300000
};
async function syncData(config) {
const startTime = Date.now();
let totalSynced = 0;
let errors = [];
try {
// 获取上次同步时间
const lastSync = await getLastSyncTime(config.source);
// 获取增量数据
const data = await fetchIncrementalData(config.source, lastSync);
// 分批同步
for (let i = 0; i < data.length; i += config.batchSize) {
const batch = data.slice(i, i + config.batchSize);
try {
await syncBatch(batch, config.target);
totalSynced += batch.length;
} catch (error) {
errors.push({
batch: i / config.batchSize + 1,
error: error.message,
items: batch.length
});
}
}
// 更新同步时间
await updateLastSyncTime(config.source, new Date());
return {
success: true,
duration: Date.now() - startTime,
totalItems: data.length,
syncedItems: totalSynced,
errors: errors
};
} catch (error) {
return {
success: false,
duration: Date.now() - startTime,
error: error.message
};
}
}
const result = await syncData(syncConfig);
return [{ json: result }];
报告生成任务
javascript
// 每天早上8点生成日报
// Cron: 0 8 * * *
async function generateDailyReport() {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const reportData = {
date: yesterday.toISOString().split('T')[0],
metrics: {},
charts: [],
summary: {}
};
try {
// 收集各种指标
reportData.metrics.orders = await getOrderMetrics(yesterday);
reportData.metrics.users = await getUserMetrics(yesterday);
reportData.metrics.revenue = await getRevenueMetrics(yesterday);
// 生成图表
reportData.charts.push(await generateOrderChart(yesterday));
reportData.charts.push(await generateRevenueChart(yesterday));
// 生成摘要
reportData.summary = generateSummary(reportData.metrics);
// 生成 PDF 报告
const pdfBuffer = await generatePDF(reportData);
// 发送邮件
await sendReportEmail(pdfBuffer, reportData.date);
return {
success: true,
reportDate: reportData.date,
metrics: reportData.metrics,
summary: reportData.summary
};
} catch (error) {
return {
success: false,
error: error.message,
reportDate: reportData.date
};
}
}
const result = await generateDailyReport();
return [{ json: result }];
系统维护任务
javascript
// 每周日凌晨2点执行系统维护
// Cron: 0 2 * * 0
async function performMaintenance() {
const maintenanceTasks = [
{ name: 'cleanup_logs', function: cleanupOldLogs },
{ name: 'optimize_database', function: optimizeDatabase },
{ name: 'backup_data', function: backupCriticalData },
{ name: 'update_cache', function: refreshCache },
{ name: 'check_disk_space', function: checkDiskSpace }
];
const results = [];
for (const task of maintenanceTasks) {
const startTime = Date.now();
try {
console.log(`Starting maintenance task: ${task.name}`);
const result = await task.function();
results.push({
task: task.name,
status: 'success',
duration: Date.now() - startTime,
result
});
} catch (error) {
results.push({
task: task.name,
status: 'failed',
duration: Date.now() - startTime,
error: error.message
});
}
}
// 生成维护报告
const report = {
timestamp: new Date().toISOString(),
totalTasks: maintenanceTasks.length,
successfulTasks: results.filter(r => r.status === 'success').length,
failedTasks: results.filter(r => r.status === 'failed').length,
totalDuration: results.reduce((sum, r) => sum + r.duration, 0),
details: results
};
// 发送维护报告
if (report.failedTasks > 0) {
await sendMaintenanceAlert(report);
}
return report;
}
const maintenanceResult = await performMaintenance();
return [{ json: maintenanceResult }];
小结
定时任务和调度系统是自动化的重要基础:
- 掌握 Cron 表达式:理解各种时间模式的表示方法
- 处理时区问题:确保任务在正确的时间执行
- 实现监控机制:跟踪任务执行状态和性能
- 添加错误处理:确保任务失败时能够恢复
- 优化调度策略:根据实际需求调整执行频率
下一篇文章,我们将学习 Webhook 和 API 集成,这是连接外部系统的重要技术。
记住,好的调度系统不仅要能按时执行任务,还要能监控任务状态、处理异常情况,并根据实际情况自动调整。这样才能构建真正可靠的自动化系统。