Sequelize 原生 SQL 查询详解 - 复杂查询的终极解决方案
发布时间:2024-04-01
作者:一介布衣
标签:Sequelize, 原生SQL, 复杂查询, 性能优化
前言
今天咱们来学习 Sequelize 的原生 SQL 查询功能。说实话,虽然 Sequelize 的 ORM 功能很强大,但有时候遇到特别复杂的查询需求,或者需要使用数据库特有的功能时,原生 SQL 就是最好的选择。
我记得有一次做数据分析项目,需要写一个包含多层子查询、窗口函数、复杂聚合的 SQL。用 Sequelize 的 ORM 方式写出来的代码又长又难懂,最后还是直接写原生 SQL 解决的。后来发现,Sequelize 的原生 SQL 功能其实很强大,既能享受 SQL 的灵活性,又能保持与 ORM 的集成。
今天我就把原生 SQL 查询的各种用法和技巧分享给大家。
基础原生查询
1. 简单查询
javascript
const { QueryTypes } = require('sequelize');
// 基本查询
const users = await sequelize.query(
'SELECT * FROM users WHERE status = ?',
{
replacements: ['active'],
type: QueryTypes.SELECT
}
);
console.log(users);
// [{ id: 1, username: 'john', email: 'john@example.com', ... }]
// 不指定类型,返回 [results, metadata]
const [results, metadata] = await sequelize.query(
'SELECT * FROM users WHERE status = ?',
{ replacements: ['active'] }
);
2. 查询类型
javascript
// SELECT 查询
const users = await sequelize.query(
'SELECT * FROM users',
{ type: QueryTypes.SELECT }
);
// INSERT 查询
const [results, metadata] = await sequelize.query(
'INSERT INTO users (username, email) VALUES (?, ?)',
{
replacements: ['newuser', 'newuser@example.com'],
type: QueryTypes.INSERT
}
);
// UPDATE 查询
const [results, metadata] = await sequelize.query(
'UPDATE users SET last_login_at = NOW() WHERE id = ?',
{
replacements: [1],
type: QueryTypes.UPDATE
}
);
// DELETE 查询
const [results, metadata] = await sequelize.query(
'DELETE FROM users WHERE status = ?',
{
replacements: ['inactive'],
type: QueryTypes.DELETE
}
);
// 原生查询(返回数据库原始结果)
const result = await sequelize.query(
'SHOW TABLES',
{ type: QueryTypes.RAW }
);
参数绑定和安全性
1. 位置参数绑定
javascript
// 使用 ? 占位符
const users = await sequelize.query(
'SELECT * FROM users WHERE age > ? AND city = ?',
{
replacements: [18, 'Beijing'],
type: QueryTypes.SELECT
}
);
// 数组参数
const userIds = [1, 2, 3, 4, 5];
const users = await sequelize.query(
'SELECT * FROM users WHERE id IN (?)',
{
replacements: [userIds],
type: QueryTypes.SELECT
}
);
2. 命名参数绑定
javascript
// 使用命名参数(推荐)
const users = await sequelize.query(
'SELECT * FROM users WHERE age > :minAge AND city = :city',
{
replacements: {
minAge: 18,
city: 'Beijing'
},
type: QueryTypes.SELECT
}
);
// 复杂查询示例
const orderStats = await sequelize.query(`
SELECT
u.username,
COUNT(o.id) as order_count,
SUM(o.total_amount) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at >= :startDate
AND u.created_at <= :endDate
AND u.status = :status
GROUP BY u.id, u.username
HAVING COUNT(o.id) > :minOrders
ORDER BY total_spent DESC
LIMIT :limit
`, {
replacements: {
startDate: '2024-01-01',
endDate: '2024-12-31',
status: 'active',
minOrders: 5,
limit: 10
},
type: QueryTypes.SELECT
});
3. 防止 SQL 注入
javascript
// ❌ 危险:直接拼接字符串
const badQuery = `SELECT * FROM users WHERE username = '${username}'`;
// ✅ 安全:使用参数绑定
const safeQuery = await sequelize.query(
'SELECT * FROM users WHERE username = :username',
{
replacements: { username },
type: QueryTypes.SELECT
}
);
// ✅ 安全:使用 bind 参数
const bindQuery = await sequelize.query(
'SELECT * FROM users WHERE username = $username',
{
bind: { username },
type: QueryTypes.SELECT
}
);
复杂查询场景
1. 窗口函数查询
javascript
// 用户排名查询
const userRankings = await sequelize.query(`
SELECT
username,
score,
ROW_NUMBER() OVER (ORDER BY score DESC) as rank,
RANK() OVER (ORDER BY score DESC) as dense_rank,
PERCENT_RANK() OVER (ORDER BY score DESC) as percent_rank
FROM users
WHERE status = 'active'
ORDER BY score DESC
`, {
type: QueryTypes.SELECT
});
// 分组排名
const topUsersByCategory = await sequelize.query(`
SELECT
category,
username,
score,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) as category_rank
FROM users u
JOIN user_categories uc ON u.id = uc.user_id
JOIN categories c ON uc.category_id = c.id
WHERE u.status = 'active'
QUALIFY ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) <= 3
`, {
type: QueryTypes.SELECT
});
2. 递归查询(CTE)
javascript
// 组织架构递归查询
const orgHierarchy = await sequelize.query(`
WITH RECURSIVE org_tree AS (
-- 根节点
SELECT
id,
name,
parent_id,
0 as level,
CAST(name AS VARCHAR(1000)) as path
FROM departments
WHERE parent_id IS NULL
UNION ALL
-- 递归部分
SELECT
d.id,
d.name,
d.parent_id,
ot.level + 1,
CONCAT(ot.path, ' > ', d.name)
FROM departments d
JOIN org_tree ot ON d.parent_id = ot.id
)
SELECT * FROM org_tree
ORDER BY level, name
`, {
type: QueryTypes.SELECT
});
3. 复杂聚合查询
javascript
// 销售数据分析
const salesAnalysis = await sequelize.query(`
SELECT
DATE_FORMAT(o.created_at, '%Y-%m') as month,
COUNT(DISTINCT o.id) as order_count,
COUNT(DISTINCT o.user_id) as unique_customers,
SUM(o.total_amount) as total_revenue,
AVG(o.total_amount) as avg_order_value,
-- 同比增长
LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) as prev_month_revenue,
-- 计算增长率
CASE
WHEN LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) > 0
THEN ROUND(
(SUM(o.total_amount) - LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')))
/ LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) * 100, 2
)
ELSE NULL
END as growth_rate
FROM orders o
WHERE o.status = 'completed'
AND o.created_at >= :startDate
AND o.created_at <= :endDate
GROUP BY DATE_FORMAT(o.created_at, '%Y-%m')
ORDER BY month
`, {
replacements: {
startDate: '2023-01-01',
endDate: '2024-12-31'
},
type: QueryTypes.SELECT
});
事务中的原生查询
1. 在事务中执行原生查询
javascript
const transaction = await sequelize.transaction();
try {
// 在事务中执行原生查询
const [results] = await sequelize.query(
'UPDATE users SET balance = balance - :amount WHERE id = :userId',
{
replacements: { amount: 100, userId: 1 },
type: QueryTypes.UPDATE,
transaction // 传入事务
}
);
await sequelize.query(
'INSERT INTO transactions (user_id, amount, type) VALUES (:userId, :amount, :type)',
{
replacements: { userId: 1, amount: 100, type: 'debit' },
type: QueryTypes.INSERT,
transaction
}
);
await transaction.commit();
} catch (error) {
await transaction.rollback();
throw error;
}
2. 批量操作
javascript
async function batchUpdateUsers(updates) {
const transaction = await sequelize.transaction();
try {
// 构建批量更新 SQL
const cases = updates.map((update, index) =>
`WHEN id = :id${index} THEN :value${index}`
).join(' ');
const replacements = {};
const ids = [];
updates.forEach((update, index) => {
replacements[`id${index}`] = update.id;
replacements[`value${index}`] = update.value;
ids.push(update.id);
});
await sequelize.query(`
UPDATE users
SET score = CASE ${cases} END
WHERE id IN (${ids.map((_, i) => `:id${i}`).join(',')})
`, {
replacements,
type: QueryTypes.UPDATE,
transaction
});
await transaction.commit();
} catch (error) {
await transaction.rollback();
throw error;
}
}
数据库特定功能
1. MySQL 特定功能
javascript
// 全文搜索
const searchResults = await sequelize.query(`
SELECT
*,
MATCH(title, content) AGAINST(:keyword IN NATURAL LANGUAGE MODE) as relevance
FROM articles
WHERE MATCH(title, content) AGAINST(:keyword IN NATURAL LANGUAGE MODE)
ORDER BY relevance DESC
`, {
replacements: { keyword: '搜索关键词' },
type: QueryTypes.SELECT
});
// JSON 字段查询
const users = await sequelize.query(`
SELECT
id,
username,
JSON_EXTRACT(metadata, '$.preferences.theme') as theme,
JSON_EXTRACT(metadata, '$.profile.age') as age
FROM users
WHERE JSON_EXTRACT(metadata, '$.profile.city') = :city
`, {
replacements: { city: 'Beijing' },
type: QueryTypes.SELECT
});
// 地理位置查询
const nearbyLocations = await sequelize.query(`
SELECT
*,
ST_Distance_Sphere(
POINT(longitude, latitude),
POINT(:userLng, :userLat)
) as distance
FROM locations
WHERE ST_Distance_Sphere(
POINT(longitude, latitude),
POINT(:userLng, :userLat)
) <= :radius
ORDER BY distance
`, {
replacements: {
userLng: 116.404,
userLat: 39.915,
radius: 5000 // 5公里
},
type: QueryTypes.SELECT
});
2. PostgreSQL 特定功能
javascript
// 数组操作
const users = await sequelize.query(`
SELECT *
FROM users
WHERE :tag = ANY(tags)
AND array_length(skills, 1) > :minSkills
`, {
replacements: {
tag: 'javascript',
minSkills: 3
},
type: QueryTypes.SELECT
});
// JSONB 查询
const products = await sequelize.query(`
SELECT
id,
name,
attributes->>'color' as color,
attributes->'specs'->>'weight' as weight
FROM products
WHERE attributes @> :filter
AND attributes ? :key
`, {
replacements: {
filter: JSON.stringify({ category: 'electronics' }),
key: 'warranty'
},
type: QueryTypes.SELECT
});
性能优化
1. 查询计划分析
javascript
// MySQL 查询计划
const explainResult = await sequelize.query(`
EXPLAIN FORMAT=JSON
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
GROUP BY u.id
`, {
type: QueryTypes.SELECT
});
console.log('查询计划:', JSON.stringify(explainResult[0], null, 2));
// PostgreSQL 查询计划
const pgExplain = await sequelize.query(`
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
GROUP BY u.id
`, {
type: QueryTypes.SELECT
});
2. 索引提示
javascript
// MySQL 索引提示
const users = await sequelize.query(`
SELECT /*+ USE_INDEX(users, idx_status_created) */ *
FROM users
WHERE status = 'active'
AND created_at > :date
ORDER BY created_at DESC
`, {
replacements: { date: '2024-01-01' },
type: QueryTypes.SELECT
});
// 强制使用索引
const forceIndexQuery = await sequelize.query(`
SELECT *
FROM users FORCE INDEX (idx_email)
WHERE email LIKE :pattern
`, {
replacements: { pattern: '%@gmail.com' },
type: QueryTypes.SELECT
});
结果处理和映射
1. 结果映射到模型
javascript
// 映射到模型实例
const users = await sequelize.query(
'SELECT * FROM users WHERE status = :status',
{
replacements: { status: 'active' },
type: QueryTypes.SELECT,
model: User, // 映射到 User 模型
mapToModel: true // 创建模型实例
}
);
// 现在 users 是 User 实例数组,可以调用模型方法
users.forEach(user => {
console.log(user.getFullName()); // 调用实例方法
});
2. 自定义结果处理
javascript
// 结果转换
const processedResults = await sequelize.query(`
SELECT
id,
username,
email,
created_at,
DATEDIFF(NOW(), created_at) as days_since_registration
FROM users
WHERE status = 'active'
`, {
type: QueryTypes.SELECT
}).then(results => {
return results.map(user => ({
...user,
isNewUser: user.days_since_registration <= 30,
registrationCategory: user.days_since_registration <= 7 ? 'new' :
user.days_since_registration <= 30 ? 'recent' : 'established'
}));
});
实战案例:数据报表系统
javascript
class ReportService {
// 用户活跃度报表
static async getUserActivityReport(startDate, endDate) {
return await sequelize.query(`
WITH daily_activity AS (
SELECT
DATE(created_at) as activity_date,
user_id,
COUNT(*) as action_count
FROM user_activities
WHERE created_at BETWEEN :startDate AND :endDate
GROUP BY DATE(created_at), user_id
),
user_stats AS (
SELECT
u.id,
u.username,
u.email,
COUNT(DISTINCT da.activity_date) as active_days,
SUM(da.action_count) as total_actions,
AVG(da.action_count) as avg_daily_actions,
MAX(da.action_count) as max_daily_actions
FROM users u
LEFT JOIN daily_activity da ON u.id = da.user_id
WHERE u.status = 'active'
GROUP BY u.id, u.username, u.email
)
SELECT
*,
CASE
WHEN active_days >= 25 THEN 'highly_active'
WHEN active_days >= 15 THEN 'moderately_active'
WHEN active_days >= 5 THEN 'low_active'
ELSE 'inactive'
END as activity_level,
ROUND(active_days * 100.0 / DATEDIFF(:endDate, :startDate), 2) as activity_percentage
FROM user_stats
ORDER BY total_actions DESC
`, {
replacements: { startDate, endDate },
type: QueryTypes.SELECT
});
}
// 销售漏斗分析
static async getSalesFunnelReport() {
return await sequelize.query(`
WITH funnel_data AS (
SELECT
'page_view' as stage,
1 as stage_order,
COUNT(DISTINCT session_id) as user_count
FROM page_views
WHERE page_type = 'product'
UNION ALL
SELECT
'add_to_cart' as stage,
2 as stage_order,
COUNT(DISTINCT user_id) as user_count
FROM cart_items
UNION ALL
SELECT
'checkout_start' as stage,
3 as stage_order,
COUNT(DISTINCT user_id) as user_count
FROM orders
WHERE status != 'cart'
UNION ALL
SELECT
'payment_complete' as stage,
4 as stage_order,
COUNT(DISTINCT user_id) as user_count
FROM orders
WHERE status IN ('paid', 'completed')
)
SELECT
stage,
stage_order,
user_count,
LAG(user_count) OVER (ORDER BY stage_order) as prev_stage_count,
CASE
WHEN LAG(user_count) OVER (ORDER BY stage_order) > 0
THEN ROUND(user_count * 100.0 / LAG(user_count) OVER (ORDER BY stage_order), 2)
ELSE 100.0
END as conversion_rate,
CASE
WHEN LAG(user_count) OVER (ORDER BY stage_order) > 0
THEN LAG(user_count) OVER (ORDER BY stage_order) - user_count
ELSE 0
END as drop_off_count
FROM funnel_data
ORDER BY stage_order
`, {
type: QueryTypes.SELECT
});
}
// 商品推荐算法
static async getProductRecommendations(userId, limit = 10) {
return await sequelize.query(`
WITH user_preferences AS (
-- 用户购买历史
SELECT
p.category_id,
COUNT(*) as purchase_count,
AVG(oi.price) as avg_price
FROM order_items oi
JOIN orders o ON oi.order_id = o.id
JOIN products p ON oi.product_id = p.id
WHERE o.user_id = :userId
AND o.status = 'completed'
GROUP BY p.category_id
),
similar_users AS (
-- 找到相似用户
SELECT
o2.user_id,
COUNT(DISTINCT p.category_id) as common_categories
FROM orders o1
JOIN order_items oi1 ON o1.id = oi1.order_id
JOIN products p ON oi1.product_id = p.id
JOIN order_items oi2 ON p.category_id = (
SELECT p2.category_id
FROM products p2
WHERE p2.id = oi2.product_id
)
JOIN orders o2 ON oi2.order_id = o2.id
WHERE o1.user_id = :userId
AND o2.user_id != :userId
AND o1.status = 'completed'
AND o2.status = 'completed'
GROUP BY o2.user_id
HAVING common_categories >= 2
ORDER BY common_categories DESC
LIMIT 50
),
recommended_products AS (
SELECT
p.id,
p.name,
p.price,
p.category_id,
COUNT(DISTINCT su.user_id) as recommendation_score,
AVG(pr.rating) as avg_rating
FROM similar_users su
JOIN orders o ON su.user_id = o.user_id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
LEFT JOIN product_reviews pr ON p.id = pr.product_id
WHERE o.status = 'completed'
AND p.id NOT IN (
-- 排除用户已购买的商品
SELECT DISTINCT oi2.product_id
FROM orders o2
JOIN order_items oi2 ON o2.id = oi2.order_id
WHERE o2.user_id = :userId
AND o2.status = 'completed'
)
GROUP BY p.id, p.name, p.price, p.category_id
HAVING AVG(pr.rating) >= 4.0 OR AVG(pr.rating) IS NULL
)
SELECT
rp.*,
CASE
WHEN up.category_id IS NOT NULL THEN rp.recommendation_score * 2
ELSE rp.recommendation_score
END as final_score
FROM recommended_products rp
LEFT JOIN user_preferences up ON rp.category_id = up.category_id
ORDER BY final_score DESC, avg_rating DESC
LIMIT :limit
`, {
replacements: { userId, limit },
type: QueryTypes.SELECT
});
}
}
总结
今天我们深入学习了 Sequelize 的原生 SQL 查询:
- ✅ 基础原生查询语法和参数绑定
- ✅ 复杂查询场景的实现方法
- ✅ 事务中的原生查询处理
- ✅ 数据库特定功能的使用
- ✅ 性能优化和查询分析
- ✅ 结果处理和映射技巧
- ✅ 完整的数据报表系统案例
掌握了这些知识,你就能够:
- 处理 ORM 无法满足的复杂查询需求
- 充分利用数据库的特有功能
- 优化查询性能
- 构建复杂的数据分析系统
原生 SQL 查询是 Sequelize 的重要补充,合理使用能让你的应用更加强大和灵活!
相关文章推荐:
有问题欢迎留言讨论,我会及时回复大家!