Skip to content

n8n 数据库操作与 SQL 节点 - 数据持久化的艺术

数据库操作是许多自动化工作流的核心需求。无论是存储处理结果、查询历史数据,还是实现复杂的业务逻辑,数据库都扮演着重要角色。今天我们来深入学习 n8n 的数据库操作能力。

支持的数据库类型

n8n 支持多种数据库系统:

关系型数据库

  • MySQL:最流行的开源关系型数据库
  • PostgreSQL:功能强大的开源对象关系型数据库
  • SQLite:轻量级的嵌入式数据库
  • Microsoft SQL Server:企业级数据库系统
  • Oracle:企业级商业数据库

NoSQL 数据库

  • MongoDB:文档型数据库
  • Redis:内存键值数据库
  • Elasticsearch:搜索和分析引擎

MySQL 节点详解

连接配置

json
{
  "host": "localhost",
  "port": 3306,
  "database": "myapp",
  "user": "dbuser",
  "password": "{{ $env.DB_PASSWORD }}",
  "ssl": false,
  "connectTimeout": 60000
}

基本操作

查询数据

sql
SELECT id, name, email, created_at 
FROM users 
WHERE status = 'active' 
ORDER BY created_at DESC 
LIMIT 100;

插入数据

sql
INSERT INTO users (name, email, status, created_at) 
VALUES (
  '{{ $json.name }}', 
  '{{ $json.email }}', 
  'active', 
  NOW()
);

更新数据

sql
UPDATE users 
SET 
  name = '{{ $json.name }}',
  email = '{{ $json.email }}',
  updated_at = NOW()
WHERE id = {{ $json.id }};

删除数据

sql
DELETE FROM users 
WHERE id = {{ $json.id }} 
AND status = 'inactive';

高级查询

联表查询

sql
SELECT 
  u.id,
  u.name,
  u.email,
  p.title as profile_title,
  COUNT(o.id) as order_count,
  SUM(o.amount) as total_spent
FROM users u
LEFT JOIN profiles p ON u.id = p.user_id
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
GROUP BY u.id, u.name, u.email, p.title
HAVING total_spent > 1000
ORDER BY total_spent DESC;

条件查询

sql
SELECT * FROM orders 
WHERE 
  created_at >= '{{ $json.startDate }}'
  AND created_at <= '{{ $json.endDate }}'
  AND status IN ('paid', 'shipped')
  AND amount BETWEEN {{ $json.minAmount }} AND {{ $json.maxAmount }};

分页查询

sql
SELECT * FROM products 
WHERE category = '{{ $json.category }}'
ORDER BY created_at DESC
LIMIT {{ $json.limit }} 
OFFSET {{ $json.offset }};

PostgreSQL 高级特性

JSON 数据处理

sql
-- 查询 JSON 字段
SELECT 
  id,
  name,
  metadata->>'category' as category,
  metadata->'tags' as tags,
  jsonb_array_length(metadata->'tags') as tag_count
FROM products 
WHERE metadata->>'status' = 'active';

数组操作

sql
-- 数组查询
SELECT * FROM articles 
WHERE '{{ $json.tag }}' = ANY(tags);

-- 数组聚合
SELECT 
  category,
  array_agg(DISTINCT author) as authors,
  count(*) as article_count
FROM articles 
GROUP BY category;

窗口函数

sql
-- 排名和分析
SELECT 
  id,
  name,
  amount,
  ROW_NUMBER() OVER (ORDER BY amount DESC) as rank,
  PERCENT_RANK() OVER (ORDER BY amount) as percentile,
  LAG(amount) OVER (ORDER BY created_at) as prev_amount
FROM orders 
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days';

数据库操作最佳实践

连接池管理

javascript
// 在 Code 节点中管理数据库连接
class DatabasePool {
  constructor(config) {
    this.config = config;
    this.pool = null;
    this.maxConnections = config.maxConnections || 10;
    this.activeConnections = 0;
  }
  
  async getConnection() {
    if (this.activeConnections >= this.maxConnections) {
      throw new Error('Connection pool exhausted');
    }
    
    this.activeConnections++;
    
    // 这里应该返回实际的数据库连接
    return {
      query: async (sql, params) => {
        // 实际的查询实现
        return await this.executeQuery(sql, params);
      },
      release: () => {
        this.activeConnections--;
      }
    };
  }
  
  async executeQuery(sql, params) {
    // 实际的数据库查询实现
    console.log('Executing query:', sql, params);
    // 返回查询结果
  }
}

// 使用连接池
const dbPool = new DatabasePool({
  host: process.env.DB_HOST,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  database: process.env.DB_NAME,
  maxConnections: 5
});

const connection = await dbPool.getConnection();
try {
  const result = await connection.query(
    'SELECT * FROM users WHERE email = ?',
    [items[0].json.email]
  );
  
  return [{ json: { users: result } }];
} finally {
  connection.release();
}

SQL 注入防护

javascript
// 安全的参数化查询
function buildSafeQuery(template, params) {
  // 使用参数化查询而不是字符串拼接
  const query = {
    sql: template,
    params: params
  };
  
  return query;
}

// 错误的做法(容易 SQL 注入)
const unsafeQuery = `SELECT * FROM users WHERE name = '${userInput}'`;

// 正确的做法(参数化查询)
const safeQuery = buildSafeQuery(
  'SELECT * FROM users WHERE name = ? AND status = ?',
  [userInput, 'active']
);

// 在 n8n 中使用参数化查询
const query = `
  SELECT * FROM orders 
  WHERE user_id = {{ $json.userId }}
  AND created_at >= {{ $json.startDate }}
  AND status = {{ $json.status }}
`;

事务处理

javascript
// 实现数据库事务
async function executeTransaction(operations) {
  const connection = await getDBConnection();
  
  try {
    await connection.beginTransaction();
    
    const results = [];
    for (const operation of operations) {
      const result = await connection.query(operation.sql, operation.params);
      results.push(result);
    }
    
    await connection.commit();
    return results;
  } catch (error) {
    await connection.rollback();
    throw error;
  } finally {
    connection.release();
  }
}

// 使用事务处理订单
const orderOperations = [
  {
    sql: 'INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)',
    params: [userId, amount, 'pending']
  },
  {
    sql: 'UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?',
    params: [quantity, productId]
  },
  {
    sql: 'INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (?, ?, ?, ?)',
    params: [orderId, productId, quantity, price]
  }
];

try {
  const results = await executeTransaction(orderOperations);
  return [{ json: { success: true, results } }];
} catch (error) {
  return [{ json: { success: false, error: error.message } }];
}

批量操作优化

批量插入

sql
-- 批量插入优化
INSERT INTO users (name, email, status, created_at) VALUES
('John Doe', 'john@example.com', 'active', NOW()),
('Jane Smith', 'jane@example.com', 'active', NOW()),
('Bob Johnson', 'bob@example.com', 'active', NOW());
javascript
// 在 Code 节点中构建批量插入
function buildBatchInsert(tableName, records, columns) {
  const placeholders = records.map(() => 
    `(${columns.map(() => '?').join(', ')})`
  ).join(', ');
  
  const sql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES ${placeholders}`;
  
  const params = records.flatMap(record => 
    columns.map(col => record[col])
  );
  
  return { sql, params };
}

// 使用批量插入
const users = items.map(item => item.json);
const columns = ['name', 'email', 'status', 'created_at'];

const batchQuery = buildBatchInsert('users', users, columns);

// 执行批量插入
const result = await executeQuery(batchQuery.sql, batchQuery.params);
return [{ json: { inserted: result.affectedRows } }];

批量更新

javascript
// 批量更新策略
async function batchUpdate(tableName, updates, keyColumn) {
  const cases = [];
  const ids = [];
  
  updates.forEach(update => {
    Object.keys(update).forEach(column => {
      if (column !== keyColumn) {
        cases.push(`WHEN ${keyColumn} = ? THEN ?`);
      }
    });
    ids.push(update[keyColumn]);
  });
  
  const updateColumns = Object.keys(updates[0]).filter(col => col !== keyColumn);
  
  const setClauses = updateColumns.map(column => {
    const columnCases = updates.map(update => 
      `WHEN ${keyColumn} = ${update[keyColumn]} THEN '${update[column]}'`
    ).join(' ');
    
    return `${column} = CASE ${columnCases} END`;
  });
  
  const sql = `
    UPDATE ${tableName} 
    SET ${setClauses.join(', ')}
    WHERE ${keyColumn} IN (${ids.map(() => '?').join(', ')})
  `;
  
  return { sql, params: ids };
}

NoSQL 数据库操作

MongoDB 操作

javascript
// MongoDB 查询示例
const mongoQuery = {
  collection: 'users',
  operation: 'find',
  query: {
    status: 'active',
    createdAt: {
      $gte: new Date('2024-01-01'),
      $lte: new Date('2024-12-31')
    },
    tags: { $in: ['premium', 'vip'] }
  },
  options: {
    sort: { createdAt: -1 },
    limit: 100,
    projection: { password: 0 } // 排除敏感字段
  }
};

// 聚合查询
const aggregationPipeline = [
  {
    $match: {
      status: 'active',
      createdAt: { $gte: new Date('2024-01-01') }
    }
  },
  {
    $group: {
      _id: '$category',
      count: { $sum: 1 },
      totalAmount: { $sum: '$amount' },
      avgAmount: { $avg: '$amount' }
    }
  },
  {
    $sort: { totalAmount: -1 }
  }
];

Redis 缓存操作

javascript
// Redis 缓存策略
class RedisCache {
  constructor(client) {
    this.client = client;
  }
  
  async get(key) {
    const value = await this.client.get(key);
    return value ? JSON.parse(value) : null;
  }
  
  async set(key, value, ttl = 3600) {
    await this.client.setex(key, ttl, JSON.stringify(value));
  }
  
  async del(key) {
    await this.client.del(key);
  }
  
  async exists(key) {
    return await this.client.exists(key);
  }
}

// 使用 Redis 缓存查询结果
const cache = new RedisCache(redisClient);
const cacheKey = `user_profile_${userId}`;

// 尝试从缓存获取
let userProfile = await cache.get(cacheKey);

if (!userProfile) {
  // 缓存未命中,从数据库查询
  userProfile = await queryDatabase(
    'SELECT * FROM user_profiles WHERE user_id = ?',
    [userId]
  );
  
  // 缓存查询结果
  await cache.set(cacheKey, userProfile, 1800); // 30分钟
}

return [{ json: userProfile }];

数据库性能优化

索引优化

sql
-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at);
CREATE INDEX idx_products_category_status ON products(category, status);

-- 复合索引
CREATE INDEX idx_orders_complex ON orders(status, created_at, user_id);

-- 部分索引(PostgreSQL)
CREATE INDEX idx_active_users ON users(email) WHERE status = 'active';

查询优化

javascript
// 查询性能监控
async function executeQueryWithMonitoring(sql, params) {
  const startTime = Date.now();
  
  try {
    const result = await executeQuery(sql, params);
    const duration = Date.now() - startTime;
    
    // 记录慢查询
    if (duration > 1000) { // 超过1秒
      console.warn('Slow query detected:', {
        sql: sql.substring(0, 100) + '...',
        duration,
        params: params?.slice(0, 5) // 只记录前5个参数
      });
    }
    
    return result;
  } catch (error) {
    const duration = Date.now() - startTime;
    console.error('Query failed:', {
      sql: sql.substring(0, 100) + '...',
      duration,
      error: error.message
    });
    throw error;
  }
}

连接优化

javascript
// 数据库连接健康检查
async function checkDatabaseHealth() {
  const checks = [];
  
  try {
    // 检查连接
    const startTime = Date.now();
    await executeQuery('SELECT 1');
    const connectionTime = Date.now() - startTime;
    
    checks.push({
      name: 'connection',
      status: 'healthy',
      responseTime: connectionTime
    });
    
    // 检查表空间
    const tableSpaceQuery = `
      SELECT 
        table_schema,
        ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS size_mb
      FROM information_schema.tables 
      WHERE table_schema = DATABASE()
      GROUP BY table_schema
    `;
    
    const spaceResult = await executeQuery(tableSpaceQuery);
    checks.push({
      name: 'table_space',
      status: 'healthy',
      size_mb: spaceResult[0]?.size_mb || 0
    });
    
    // 检查活跃连接数
    const connectionQuery = 'SHOW STATUS LIKE "Threads_connected"';
    const connectionResult = await executeQuery(connectionQuery);
    
    checks.push({
      name: 'active_connections',
      status: 'healthy',
      count: parseInt(connectionResult[0]?.Value || 0)
    });
    
  } catch (error) {
    checks.push({
      name: 'connection',
      status: 'unhealthy',
      error: error.message
    });
  }
  
  return {
    timestamp: new Date().toISOString(),
    overall_status: checks.every(c => c.status === 'healthy') ? 'healthy' : 'unhealthy',
    checks
  };
}

实际应用案例

用户行为分析

sql
-- 用户行为分析查询
WITH user_activity AS (
  SELECT 
    user_id,
    DATE(created_at) as activity_date,
    COUNT(*) as action_count,
    COUNT(DISTINCT session_id) as session_count
  FROM user_actions 
  WHERE created_at >= CURRENT_DATE - INTERVAL 30 DAY
  GROUP BY user_id, DATE(created_at)
),
user_metrics AS (
  SELECT 
    user_id,
    COUNT(DISTINCT activity_date) as active_days,
    AVG(action_count) as avg_daily_actions,
    MAX(action_count) as max_daily_actions,
    SUM(session_count) as total_sessions
  FROM user_activity
  GROUP BY user_id
)
SELECT 
  u.id,
  u.name,
  u.email,
  um.active_days,
  um.avg_daily_actions,
  um.max_daily_actions,
  um.total_sessions,
  CASE 
    WHEN um.active_days >= 20 THEN 'highly_active'
    WHEN um.active_days >= 10 THEN 'moderately_active'
    WHEN um.active_days >= 5 THEN 'low_active'
    ELSE 'inactive'
  END as activity_level
FROM users u
LEFT JOIN user_metrics um ON u.id = um.user_id
WHERE u.status = 'active'
ORDER BY um.active_days DESC, um.avg_daily_actions DESC;

数据同步任务

javascript
// 数据同步工作流
async function syncDataBetweenDatabases(sourceConfig, targetConfig, syncRules) {
  const results = {
    synced: 0,
    errors: 0,
    details: []
  };
  
  for (const rule of syncRules) {
    try {
      // 从源数据库获取数据
      const sourceData = await queryDatabase(sourceConfig, rule.sourceQuery);
      
      if (sourceData.length === 0) {
        continue;
      }
      
      // 准备目标数据
      const targetData = sourceData.map(row => rule.transform(row));
      
      // 批量插入到目标数据库
      const batchSize = 100;
      for (let i = 0; i < targetData.length; i += batchSize) {
        const batch = targetData.slice(i, i + batchSize);
        
        try {
          await batchInsert(targetConfig, rule.targetTable, batch);
          results.synced += batch.length;
        } catch (error) {
          results.errors += batch.length;
          results.details.push({
            table: rule.targetTable,
            batch: Math.floor(i / batchSize) + 1,
            error: error.message
          });
        }
      }
      
    } catch (error) {
      results.errors++;
      results.details.push({
        rule: rule.name,
        error: error.message
      });
    }
  }
  
  return results;
}

// 定义同步规则
const syncRules = [
  {
    name: 'sync_users',
    sourceQuery: 'SELECT * FROM users WHERE updated_at > ?',
    targetTable: 'users_replica',
    transform: (row) => ({
      id: row.id,
      name: row.name,
      email: row.email.toLowerCase(),
      status: row.status,
      synced_at: new Date()
    })
  }
];

const syncResult = await syncDataBetweenDatabases(
  sourceConfig,
  targetConfig,
  syncRules
);

return [{ json: syncResult }];

小结

数据库操作是 n8n 工作流的重要组成部分:

  1. 选择合适的数据库:根据数据特点和性能需求选择
  2. 安全第一:防止 SQL 注入,使用参数化查询
  3. 性能优化:合理使用索引、连接池、批量操作
  4. 事务处理:确保数据一致性
  5. 监控维护:跟踪查询性能和数据库健康状态

下一篇文章,我们将学习文件处理和存储技术,这是处理文档、图片等非结构化数据的重要技能。

记住,数据库是系统的核心,良好的数据库设计和操作习惯对整个系统的性能和稳定性至关重要。始终要考虑数据安全、性能优化和错误处理。