Sequelize 事务处理实战 - 保证数据一致性的核心技术
发布时间:2024-03-07
作者:一介布衣
标签:Sequelize, 事务, 数据一致性, ACID
前言
今天咱们来学习 Sequelize 中一个非常重要的概念 - 事务(Transaction)。说实话,事务是数据库操作中保证数据一致性的核心机制,特别是在涉及多个表操作的时候,事务就显得尤为重要。
我记得刚开始做项目的时候,经常遇到这样的问题:用户下单时,需要同时更新订单表、库存表、用户积分表等多张表,如果中间某一步出错了,就会导致数据不一致。比如订单创建成功了,但是库存没有扣减,或者积分没有增加。后来学会了事务,这些问题就迎刃而解了。
今天我就把事务的概念、用法和最佳实践分享给大家。
事务基础概念
什么是事务?
事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败。事务具有四个重要特性(ACID):
- 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成
- 一致性(Consistency):事务执行前后,数据库都处于一致状态
- 隔离性(Isolation):并发执行的事务之间不会相互影响
- 持久性(Durability):事务一旦提交,其结果就是永久性的
为什么需要事务?
javascript
// 没有事务的转账操作(危险!)
async function transferMoney(fromUserId, toUserId, amount) {
// 第一步:从转出账户扣钱
await User.decrement('balance', {
by: amount,
where: { id: fromUserId }
});
// 如果这里出错了,钱就丢了!
throw new Error('网络错误');
// 第二步:给转入账户加钱
await User.increment('balance', {
by: amount,
where: { id: toUserId }
});
}
上面的代码有个严重问题:如果第一步成功了,第二步失败了,钱就凭空消失了!
Sequelize 事务的基本用法
1. 手动管理事务
javascript
async function transferMoneyWithTransaction(fromUserId, toUserId, amount) {
// 开始事务
const transaction = await sequelize.transaction();
try {
// 检查余额
const fromUser = await User.findByPk(fromUserId, { transaction });
if (fromUser.balance < amount) {
throw new Error('余额不足');
}
// 扣减转出账户
await User.decrement('balance', {
by: amount,
where: { id: fromUserId },
transaction
});
// 增加转入账户
await User.increment('balance', {
by: amount,
where: { id: toUserId },
transaction
});
// 记录转账日志
await TransferLog.create({
fromUserId,
toUserId,
amount,
status: 'completed'
}, { transaction });
// 提交事务
await transaction.commit();
return { success: true, message: '转账成功' };
} catch (error) {
// 回滚事务
await transaction.rollback();
console.error('转账失败:', error);
return { success: false, message: error.message };
}
}
2. 自动管理事务
Sequelize 提供了更简洁的自动管理事务的方式:
javascript
async function transferMoneyAuto(fromUserId, toUserId, amount) {
try {
const result = await sequelize.transaction(async (t) => {
// 检查余额
const fromUser = await User.findByPk(fromUserId, { transaction: t });
if (fromUser.balance < amount) {
throw new Error('余额不足');
}
// 扣减转出账户
await User.decrement('balance', {
by: amount,
where: { id: fromUserId },
transaction: t
});
// 增加转入账户
await User.increment('balance', {
by: amount,
where: { id: toUserId },
transaction: t
});
// 记录转账日志
const log = await TransferLog.create({
fromUserId,
toUserId,
amount,
status: 'completed'
}, { transaction: t });
return log;
});
return { success: true, data: result };
} catch (error) {
console.error('转账失败:', error);
return { success: false, message: error.message };
}
}
推荐使用自动管理事务,因为它会自动处理提交和回滚,代码更简洁,也不容易出错。
事务隔离级别
1. 隔离级别详解
javascript
const { Transaction } = require('sequelize');
// 读未提交(最低级别,可能出现脏读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.READ_UNCOMMITTED
});
// 读已提交(防止脏读,但可能出现不可重复读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED
});
// 可重复读(防止脏读和不可重复读,但可能出现幻读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
});
// 串行化(最高级别,防止所有并发问题)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE
});
2. 实际应用场景
javascript
// 库存扣减场景:使用可重复读防止超卖
async function decreaseStock(productId, quantity) {
return await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
}, async (t) => {
// 查询当前库存
const product = await Product.findByPk(productId, {
transaction: t,
lock: true // 行锁
});
if (product.stock < quantity) {
throw new Error('库存不足');
}
// 扣减库存
await product.decrement('stock', {
by: quantity,
transaction: t
});
return product;
});
}
锁机制
1. 悲观锁
javascript
// 行锁:锁定特定记录
async function updateUserWithLock(userId, updateData) {
return await sequelize.transaction(async (t) => {
// 查询时加锁
const user = await User.findByPk(userId, {
transaction: t,
lock: true // 等同于 lock: t.LOCK.UPDATE
});
if (!user) {
throw new Error('用户不存在');
}
// 更新数据
await user.update(updateData, { transaction: t });
return user;
});
}
// 共享锁:允许其他事务读取,但不允许修改
async function readUserWithSharedLock(userId) {
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, {
transaction: t,
lock: t.LOCK.SHARE
});
return user;
});
}
2. 乐观锁
javascript
// 使用版本号实现乐观锁
const User = sequelize.define('User', {
name: DataTypes.STRING,
balance: DataTypes.DECIMAL(10, 2),
version: DataTypes.INTEGER // 版本号字段
}, {
version: true // 启用乐观锁
});
async function updateUserOptimistic(userId, updateData) {
try {
const user = await User.findByPk(userId);
await user.update(updateData);
return { success: true };
} catch (error) {
if (error.name === 'SequelizeOptimisticLockError') {
return {
success: false,
message: '数据已被其他用户修改,请刷新后重试'
};
}
throw error;
}
}
复杂事务场景
1. 电商下单流程
javascript
async function createOrder(userId, items, addressId, couponId) {
return await sequelize.transaction(async (t) => {
// 1. 验证用户
const user = await User.findByPk(userId, { transaction: t });
if (!user) {
throw new Error('用户不存在');
}
// 2. 验证商品和库存
let totalAmount = 0;
const orderItems = [];
for (const item of items) {
const product = await Product.findByPk(item.productId, {
transaction: t,
lock: true // 锁定商品记录
});
if (!product) {
throw new Error(`商品 ${item.productId} 不存在`);
}
if (product.stock < item.quantity) {
throw new Error(`商品 ${product.name} 库存不足`);
}
// 扣减库存
await product.decrement('stock', {
by: item.quantity,
transaction: t
});
const itemTotal = product.price * item.quantity;
totalAmount += itemTotal;
orderItems.push({
productId: item.productId,
productName: product.name,
price: product.price,
quantity: item.quantity,
subtotal: itemTotal
});
}
// 3. 处理优惠券
let discountAmount = 0;
if (couponId) {
const coupon = await Coupon.findByPk(couponId, {
transaction: t,
lock: true
});
if (!coupon || coupon.status !== 'active') {
throw new Error('优惠券无效');
}
if (coupon.minAmount && totalAmount < coupon.minAmount) {
throw new Error(`订单金额不满足优惠券使用条件`);
}
discountAmount = Math.min(coupon.discountAmount, totalAmount);
// 标记优惠券为已使用
await coupon.update({
status: 'used',
usedAt: new Date(),
usedBy: userId
}, { transaction: t });
}
const finalAmount = totalAmount - discountAmount;
// 4. 创建订单
const order = await Order.create({
userId,
addressId,
totalAmount,
discountAmount,
finalAmount,
status: 'pending',
orderNumber: generateOrderNumber()
}, { transaction: t });
// 5. 创建订单项
for (const item of orderItems) {
await OrderItem.create({
orderId: order.id,
...item
}, { transaction: t });
}
// 6. 记录库存变动日志
for (const item of items) {
await StockLog.create({
productId: item.productId,
type: 'decrease',
quantity: item.quantity,
reason: 'order_created',
relatedId: order.id
}, { transaction: t });
}
return order;
});
}
2. 用户积分系统
javascript
async function processPointsTransaction(userId, points, type, reason, relatedId) {
return await sequelize.transaction(async (t) => {
// 查询用户并锁定
const user = await User.findByPk(userId, {
transaction: t,
lock: true
});
if (!user) {
throw new Error('用户不存在');
}
// 检查积分余额(扣减时)
if (type === 'decrease' && user.points < points) {
throw new Error('积分余额不足');
}
// 更新用户积分
const newPoints = type === 'increase'
? user.points + points
: user.points - points;
await user.update({ points: newPoints }, { transaction: t });
// 记录积分变动日志
await PointsLog.create({
userId,
type,
points,
reason,
relatedId,
beforePoints: user.points,
afterPoints: newPoints
}, { transaction: t });
// 检查等级升级
const newLevel = calculateUserLevel(newPoints);
if (newLevel > user.level) {
await user.update({ level: newLevel }, { transaction: t });
// 发送升级奖励
await this.processPointsTransaction(
userId,
newLevel * 100,
'increase',
'level_up_bonus',
null,
t
);
}
return {
user: await user.reload({ transaction: t }),
pointsChange: type === 'increase' ? points : -points
};
});
}
事务性能优化
1. 减少事务时间
javascript
// 不好的做法:事务时间过长
async function badExample(userId, data) {
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, { transaction: t });
// 耗时的外部API调用(不应该在事务中)
const result = await callExternalAPI(data);
await user.update({ externalId: result.id }, { transaction: t });
});
}
// 好的做法:先处理外部操作,再开启事务
async function goodExample(userId, data) {
// 先调用外部API
const result = await callExternalAPI(data);
// 再开启事务进行数据库操作
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, { transaction: t });
await user.update({ externalId: result.id }, { transaction: t });
});
}
2. 批量操作优化
javascript
// 批量更新用户积分
async function batchUpdatePoints(userPointsMap) {
return await sequelize.transaction(async (t) => {
const userIds = Object.keys(userPointsMap);
// 批量查询用户
const users = await User.findAll({
where: { id: userIds },
transaction: t,
lock: true
});
const updates = [];
const logs = [];
for (const user of users) {
const pointsChange = userPointsMap[user.id];
const newPoints = user.points + pointsChange;
updates.push(
user.update({ points: newPoints }, { transaction: t })
);
logs.push({
userId: user.id,
type: pointsChange > 0 ? 'increase' : 'decrease',
points: Math.abs(pointsChange),
reason: 'batch_update',
beforePoints: user.points,
afterPoints: newPoints
});
}
// 并行执行更新
await Promise.all(updates);
// 批量创建日志
await PointsLog.bulkCreate(logs, { transaction: t });
return users.length;
});
}
错误处理和重试机制
1. 死锁处理
javascript
async function handleDeadlock(operation, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
// 检查是否是死锁错误
if (error.name === 'SequelizeDatabaseError' &&
error.original?.code === 'ER_LOCK_DEADLOCK') {
if (attempt === maxRetries) {
throw new Error('操作失败:检测到死锁,已重试多次');
}
// 随机延迟后重试
const delay = Math.random() * 1000 * attempt;
await new Promise(resolve => setTimeout(resolve, delay));
console.log(`检测到死锁,第 ${attempt} 次重试...`);
continue;
}
throw error;
}
}
}
// 使用示例
async function transferWithDeadlockHandling(fromUserId, toUserId, amount) {
return await handleDeadlock(async () => {
return await transferMoneyAuto(fromUserId, toUserId, amount);
});
}
2. 事务超时处理
javascript
async function transactionWithTimeout(operation, timeoutMs = 30000) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('事务超时')), timeoutMs);
});
try {
return await Promise.race([
operation(),
timeoutPromise
]);
} catch (error) {
if (error.message === '事务超时') {
console.error('事务执行超时,可能存在性能问题');
}
throw error;
}
}
事务监控和调试
1. 事务日志
javascript
class TransactionLogger {
static async logTransaction(name, operation) {
const startTime = Date.now();
const transactionId = generateTransactionId();
console.log(`[${transactionId}] 事务开始: ${name}`);
try {
const result = await operation();
const duration = Date.now() - startTime;
console.log(`[${transactionId}] 事务成功: ${name}, 耗时: ${duration}ms`);
// 记录到监控系统
await this.recordMetrics(name, 'success', duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
console.error(`[${transactionId}] 事务失败: ${name}, 耗时: ${duration}ms, 错误: ${error.message}`);
// 记录到监控系统
await this.recordMetrics(name, 'error', duration);
throw error;
}
}
static async recordMetrics(name, status, duration) {
// 发送到监控系统(如 Prometheus、DataDog 等)
// metrics.increment(`transaction.${name}.${status}`);
// metrics.histogram(`transaction.${name}.duration`, duration);
}
}
// 使用示例
async function monitoredTransfer(fromUserId, toUserId, amount) {
return await TransactionLogger.logTransaction('user_transfer', async () => {
return await transferMoneyAuto(fromUserId, toUserId, amount);
});
}
2. 事务性能分析
javascript
// 分析长时间运行的事务
sequelize.addHook('beforeTransaction', (transaction) => {
transaction.startTime = Date.now();
});
sequelize.addHook('afterTransaction', (transaction) => {
const duration = Date.now() - transaction.startTime;
if (duration > 5000) { // 超过5秒的事务
console.warn(`长时间事务警告: 耗时 ${duration}ms`);
}
});
总结
今天我们深入学习了 Sequelize 的事务处理:
- ✅ 事务的基本概念和ACID特性
- ✅ 手动和自动管理事务的方法
- ✅ 事务隔离级别和锁机制
- ✅ 复杂业务场景的事务设计
- ✅ 事务性能优化技巧
- ✅ 错误处理和监控方案
掌握了这些知识,你就能够:
- 保证复杂业务操作的数据一致性
- 处理并发访问和竞态条件
- 优化事务性能
- 构建健壮的数据操作逻辑
下一篇文章,我们将学习 Sequelize 的迁移系统,这是管理数据库结构变更的重要工具。
相关文章推荐:
有问题欢迎留言讨论,我会及时回复大家!