n8n 数据库操作与 SQL 节点 - 数据持久化的艺术
数据库操作是许多自动化工作流的核心需求。无论是存储处理结果、查询历史数据,还是实现复杂的业务逻辑,数据库都扮演着重要角色。今天我们来深入学习 n8n 的数据库操作能力。
支持的数据库类型
n8n 支持多种数据库系统:
关系型数据库
- MySQL:最流行的开源关系型数据库
- PostgreSQL:功能强大的开源对象关系型数据库
- SQLite:轻量级的嵌入式数据库
- Microsoft SQL Server:企业级数据库系统
- Oracle:企业级商业数据库
NoSQL 数据库
- MongoDB:文档型数据库
- Redis:内存键值数据库
- Elasticsearch:搜索和分析引擎
MySQL 节点详解
连接配置
json
{
"host": "localhost",
"port": 3306,
"database": "myapp",
"user": "dbuser",
"password": "{{ $env.DB_PASSWORD }}",
"ssl": false,
"connectTimeout": 60000
}
基本操作
查询数据
sql
SELECT id, name, email, created_at
FROM users
WHERE status = 'active'
ORDER BY created_at DESC
LIMIT 100;
插入数据
sql
INSERT INTO users (name, email, status, created_at)
VALUES (
'{{ $json.name }}',
'{{ $json.email }}',
'active',
NOW()
);
更新数据
sql
UPDATE users
SET
name = '{{ $json.name }}',
email = '{{ $json.email }}',
updated_at = NOW()
WHERE id = {{ $json.id }};
删除数据
sql
DELETE FROM users
WHERE id = {{ $json.id }}
AND status = 'inactive';
高级查询
联表查询
sql
SELECT
u.id,
u.name,
u.email,
p.title as profile_title,
COUNT(o.id) as order_count,
SUM(o.amount) as total_spent
FROM users u
LEFT JOIN profiles p ON u.id = p.user_id
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
GROUP BY u.id, u.name, u.email, p.title
HAVING total_spent > 1000
ORDER BY total_spent DESC;
条件查询
sql
SELECT * FROM orders
WHERE
created_at >= '{{ $json.startDate }}'
AND created_at <= '{{ $json.endDate }}'
AND status IN ('paid', 'shipped')
AND amount BETWEEN {{ $json.minAmount }} AND {{ $json.maxAmount }};
分页查询
sql
SELECT * FROM products
WHERE category = '{{ $json.category }}'
ORDER BY created_at DESC
LIMIT {{ $json.limit }}
OFFSET {{ $json.offset }};
PostgreSQL 高级特性
JSON 数据处理
sql
-- 查询 JSON 字段
SELECT
id,
name,
metadata->>'category' as category,
metadata->'tags' as tags,
jsonb_array_length(metadata->'tags') as tag_count
FROM products
WHERE metadata->>'status' = 'active';
数组操作
sql
-- 数组查询
SELECT * FROM articles
WHERE '{{ $json.tag }}' = ANY(tags);
-- 数组聚合
SELECT
category,
array_agg(DISTINCT author) as authors,
count(*) as article_count
FROM articles
GROUP BY category;
窗口函数
sql
-- 排名和分析
SELECT
id,
name,
amount,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank,
PERCENT_RANK() OVER (ORDER BY amount) as percentile,
LAG(amount) OVER (ORDER BY created_at) as prev_amount
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days';
数据库操作最佳实践
连接池管理
javascript
// 在 Code 节点中管理数据库连接
class DatabasePool {
constructor(config) {
this.config = config;
this.pool = null;
this.maxConnections = config.maxConnections || 10;
this.activeConnections = 0;
}
async getConnection() {
if (this.activeConnections >= this.maxConnections) {
throw new Error('Connection pool exhausted');
}
this.activeConnections++;
// 这里应该返回实际的数据库连接
return {
query: async (sql, params) => {
// 实际的查询实现
return await this.executeQuery(sql, params);
},
release: () => {
this.activeConnections--;
}
};
}
async executeQuery(sql, params) {
// 实际的数据库查询实现
console.log('Executing query:', sql, params);
// 返回查询结果
}
}
// 使用连接池
const dbPool = new DatabasePool({
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
maxConnections: 5
});
const connection = await dbPool.getConnection();
try {
const result = await connection.query(
'SELECT * FROM users WHERE email = ?',
[items[0].json.email]
);
return [{ json: { users: result } }];
} finally {
connection.release();
}
SQL 注入防护
javascript
// 安全的参数化查询
function buildSafeQuery(template, params) {
// 使用参数化查询而不是字符串拼接
const query = {
sql: template,
params: params
};
return query;
}
// 错误的做法(容易 SQL 注入)
const unsafeQuery = `SELECT * FROM users WHERE name = '${userInput}'`;
// 正确的做法(参数化查询)
const safeQuery = buildSafeQuery(
'SELECT * FROM users WHERE name = ? AND status = ?',
[userInput, 'active']
);
// 在 n8n 中使用参数化查询
const query = `
SELECT * FROM orders
WHERE user_id = {{ $json.userId }}
AND created_at >= {{ $json.startDate }}
AND status = {{ $json.status }}
`;
事务处理
javascript
// 实现数据库事务
async function executeTransaction(operations) {
const connection = await getDBConnection();
try {
await connection.beginTransaction();
const results = [];
for (const operation of operations) {
const result = await connection.query(operation.sql, operation.params);
results.push(result);
}
await connection.commit();
return results;
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}
// 使用事务处理订单
const orderOperations = [
{
sql: 'INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)',
params: [userId, amount, 'pending']
},
{
sql: 'UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?',
params: [quantity, productId]
},
{
sql: 'INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (?, ?, ?, ?)',
params: [orderId, productId, quantity, price]
}
];
try {
const results = await executeTransaction(orderOperations);
return [{ json: { success: true, results } }];
} catch (error) {
return [{ json: { success: false, error: error.message } }];
}
批量操作优化
批量插入
sql
-- 批量插入优化
INSERT INTO users (name, email, status, created_at) VALUES
('John Doe', 'john@example.com', 'active', NOW()),
('Jane Smith', 'jane@example.com', 'active', NOW()),
('Bob Johnson', 'bob@example.com', 'active', NOW());
javascript
// 在 Code 节点中构建批量插入
function buildBatchInsert(tableName, records, columns) {
const placeholders = records.map(() =>
`(${columns.map(() => '?').join(', ')})`
).join(', ');
const sql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES ${placeholders}`;
const params = records.flatMap(record =>
columns.map(col => record[col])
);
return { sql, params };
}
// 使用批量插入
const users = items.map(item => item.json);
const columns = ['name', 'email', 'status', 'created_at'];
const batchQuery = buildBatchInsert('users', users, columns);
// 执行批量插入
const result = await executeQuery(batchQuery.sql, batchQuery.params);
return [{ json: { inserted: result.affectedRows } }];
批量更新
javascript
// 批量更新策略
async function batchUpdate(tableName, updates, keyColumn) {
const cases = [];
const ids = [];
updates.forEach(update => {
Object.keys(update).forEach(column => {
if (column !== keyColumn) {
cases.push(`WHEN ${keyColumn} = ? THEN ?`);
}
});
ids.push(update[keyColumn]);
});
const updateColumns = Object.keys(updates[0]).filter(col => col !== keyColumn);
const setClauses = updateColumns.map(column => {
const columnCases = updates.map(update =>
`WHEN ${keyColumn} = ${update[keyColumn]} THEN '${update[column]}'`
).join(' ');
return `${column} = CASE ${columnCases} END`;
});
const sql = `
UPDATE ${tableName}
SET ${setClauses.join(', ')}
WHERE ${keyColumn} IN (${ids.map(() => '?').join(', ')})
`;
return { sql, params: ids };
}
NoSQL 数据库操作
MongoDB 操作
javascript
// MongoDB 查询示例
const mongoQuery = {
collection: 'users',
operation: 'find',
query: {
status: 'active',
createdAt: {
$gte: new Date('2024-01-01'),
$lte: new Date('2024-12-31')
},
tags: { $in: ['premium', 'vip'] }
},
options: {
sort: { createdAt: -1 },
limit: 100,
projection: { password: 0 } // 排除敏感字段
}
};
// 聚合查询
const aggregationPipeline = [
{
$match: {
status: 'active',
createdAt: { $gte: new Date('2024-01-01') }
}
},
{
$group: {
_id: '$category',
count: { $sum: 1 },
totalAmount: { $sum: '$amount' },
avgAmount: { $avg: '$amount' }
}
},
{
$sort: { totalAmount: -1 }
}
];
Redis 缓存操作
javascript
// Redis 缓存策略
class RedisCache {
constructor(client) {
this.client = client;
}
async get(key) {
const value = await this.client.get(key);
return value ? JSON.parse(value) : null;
}
async set(key, value, ttl = 3600) {
await this.client.setex(key, ttl, JSON.stringify(value));
}
async del(key) {
await this.client.del(key);
}
async exists(key) {
return await this.client.exists(key);
}
}
// 使用 Redis 缓存查询结果
const cache = new RedisCache(redisClient);
const cacheKey = `user_profile_${userId}`;
// 尝试从缓存获取
let userProfile = await cache.get(cacheKey);
if (!userProfile) {
// 缓存未命中,从数据库查询
userProfile = await queryDatabase(
'SELECT * FROM user_profiles WHERE user_id = ?',
[userId]
);
// 缓存查询结果
await cache.set(cacheKey, userProfile, 1800); // 30分钟
}
return [{ json: userProfile }];
数据库性能优化
索引优化
sql
-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at);
CREATE INDEX idx_products_category_status ON products(category, status);
-- 复合索引
CREATE INDEX idx_orders_complex ON orders(status, created_at, user_id);
-- 部分索引(PostgreSQL)
CREATE INDEX idx_active_users ON users(email) WHERE status = 'active';
查询优化
javascript
// 查询性能监控
async function executeQueryWithMonitoring(sql, params) {
const startTime = Date.now();
try {
const result = await executeQuery(sql, params);
const duration = Date.now() - startTime;
// 记录慢查询
if (duration > 1000) { // 超过1秒
console.warn('Slow query detected:', {
sql: sql.substring(0, 100) + '...',
duration,
params: params?.slice(0, 5) // 只记录前5个参数
});
}
return result;
} catch (error) {
const duration = Date.now() - startTime;
console.error('Query failed:', {
sql: sql.substring(0, 100) + '...',
duration,
error: error.message
});
throw error;
}
}
连接优化
javascript
// 数据库连接健康检查
async function checkDatabaseHealth() {
const checks = [];
try {
// 检查连接
const startTime = Date.now();
await executeQuery('SELECT 1');
const connectionTime = Date.now() - startTime;
checks.push({
name: 'connection',
status: 'healthy',
responseTime: connectionTime
});
// 检查表空间
const tableSpaceQuery = `
SELECT
table_schema,
ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS size_mb
FROM information_schema.tables
WHERE table_schema = DATABASE()
GROUP BY table_schema
`;
const spaceResult = await executeQuery(tableSpaceQuery);
checks.push({
name: 'table_space',
status: 'healthy',
size_mb: spaceResult[0]?.size_mb || 0
});
// 检查活跃连接数
const connectionQuery = 'SHOW STATUS LIKE "Threads_connected"';
const connectionResult = await executeQuery(connectionQuery);
checks.push({
name: 'active_connections',
status: 'healthy',
count: parseInt(connectionResult[0]?.Value || 0)
});
} catch (error) {
checks.push({
name: 'connection',
status: 'unhealthy',
error: error.message
});
}
return {
timestamp: new Date().toISOString(),
overall_status: checks.every(c => c.status === 'healthy') ? 'healthy' : 'unhealthy',
checks
};
}
实际应用案例
用户行为分析
sql
-- 用户行为分析查询
WITH user_activity AS (
SELECT
user_id,
DATE(created_at) as activity_date,
COUNT(*) as action_count,
COUNT(DISTINCT session_id) as session_count
FROM user_actions
WHERE created_at >= CURRENT_DATE - INTERVAL 30 DAY
GROUP BY user_id, DATE(created_at)
),
user_metrics AS (
SELECT
user_id,
COUNT(DISTINCT activity_date) as active_days,
AVG(action_count) as avg_daily_actions,
MAX(action_count) as max_daily_actions,
SUM(session_count) as total_sessions
FROM user_activity
GROUP BY user_id
)
SELECT
u.id,
u.name,
u.email,
um.active_days,
um.avg_daily_actions,
um.max_daily_actions,
um.total_sessions,
CASE
WHEN um.active_days >= 20 THEN 'highly_active'
WHEN um.active_days >= 10 THEN 'moderately_active'
WHEN um.active_days >= 5 THEN 'low_active'
ELSE 'inactive'
END as activity_level
FROM users u
LEFT JOIN user_metrics um ON u.id = um.user_id
WHERE u.status = 'active'
ORDER BY um.active_days DESC, um.avg_daily_actions DESC;
数据同步任务
javascript
// 数据同步工作流
async function syncDataBetweenDatabases(sourceConfig, targetConfig, syncRules) {
const results = {
synced: 0,
errors: 0,
details: []
};
for (const rule of syncRules) {
try {
// 从源数据库获取数据
const sourceData = await queryDatabase(sourceConfig, rule.sourceQuery);
if (sourceData.length === 0) {
continue;
}
// 准备目标数据
const targetData = sourceData.map(row => rule.transform(row));
// 批量插入到目标数据库
const batchSize = 100;
for (let i = 0; i < targetData.length; i += batchSize) {
const batch = targetData.slice(i, i + batchSize);
try {
await batchInsert(targetConfig, rule.targetTable, batch);
results.synced += batch.length;
} catch (error) {
results.errors += batch.length;
results.details.push({
table: rule.targetTable,
batch: Math.floor(i / batchSize) + 1,
error: error.message
});
}
}
} catch (error) {
results.errors++;
results.details.push({
rule: rule.name,
error: error.message
});
}
}
return results;
}
// 定义同步规则
const syncRules = [
{
name: 'sync_users',
sourceQuery: 'SELECT * FROM users WHERE updated_at > ?',
targetTable: 'users_replica',
transform: (row) => ({
id: row.id,
name: row.name,
email: row.email.toLowerCase(),
status: row.status,
synced_at: new Date()
})
}
];
const syncResult = await syncDataBetweenDatabases(
sourceConfig,
targetConfig,
syncRules
);
return [{ json: syncResult }];
小结
数据库操作是 n8n 工作流的重要组成部分:
- 选择合适的数据库:根据数据特点和性能需求选择
- 安全第一:防止 SQL 注入,使用参数化查询
- 性能优化:合理使用索引、连接池、批量操作
- 事务处理:确保数据一致性
- 监控维护:跟踪查询性能和数据库健康状态
下一篇文章,我们将学习文件处理和存储技术,这是处理文档、图片等非结构化数据的重要技能。
记住,数据库是系统的核心,良好的数据库设计和操作习惯对整个系统的性能和稳定性至关重要。始终要考虑数据安全、性能优化和错误处理。