跳到主要内容

Sequelize 事务处理实战 - 保证数据一致性的核心技术

· 阅读需 10 分钟
一介布衣
全栈开发者

发布时间:2024-03-07
作者:一介布衣
标签:Sequelize, 事务, 数据一致性, ACID

前言

今天咱们来学习 Sequelize 中一个非常重要的概念 - 事务(Transaction)。说实话,事务是数据库操作中保证数据一致性的核心机制,特别是在涉及多个表操作的时候,事务就显得尤为重要。

我记得刚开始做项目的时候,经常遇到这样的问题:用户下单时,需要同时更新订单表、库存表、用户积分表等多张表,如果中间某一步出错了,就会导致数据不一致。比如订单创建成功了,但是库存没有扣减,或者积分没有增加。后来学会了事务,这些问题就迎刃而解了。

今天我就把事务的概念、用法和最佳实践分享给大家。

事务基础概念

什么是事务?

事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败。事务具有四个重要特性(ACID):

  • 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成
  • 一致性(Consistency):事务执行前后,数据库都处于一致状态
  • 隔离性(Isolation):并发执行的事务之间不会相互影响
  • 持久性(Durability):事务一旦提交,其结果就是永久性的

为什么需要事务?

// 没有事务的转账操作(危险!)
async function transferMoney(fromUserId, toUserId, amount) {
// 第一步:从转出账户扣钱
await User.decrement('balance', {
by: amount,
where: { id: fromUserId }
});

// 如果这里出错了,钱就丢了!
throw new Error('网络错误');

// 第二步:给转入账户加钱
await User.increment('balance', {
by: amount,
where: { id: toUserId }
});
}

上面的代码有个严重问题:如果第一步成功了,第二步失败了,钱就凭空消失了!

Sequelize 事务的基本用法

1. 手动管理事务

async function transferMoneyWithTransaction(fromUserId, toUserId, amount) {
// 开始事务
const transaction = await sequelize.transaction();

try {
// 检查余额
const fromUser = await User.findByPk(fromUserId, { transaction });
if (fromUser.balance < amount) {
throw new Error('余额不足');
}

// 扣减转出账户
await User.decrement('balance', {
by: amount,
where: { id: fromUserId },
transaction
});

// 增加转入账户
await User.increment('balance', {
by: amount,
where: { id: toUserId },
transaction
});

// 记录转账日志
await TransferLog.create({
fromUserId,
toUserId,
amount,
status: 'completed'
}, { transaction });

// 提交事务
await transaction.commit();

return { success: true, message: '转账成功' };

} catch (error) {
// 回滚事务
await transaction.rollback();

console.error('转账失败:', error);
return { success: false, message: error.message };
}
}

2. 自动管理事务

Sequelize 提供了更简洁的自动管理事务的方式:

async function transferMoneyAuto(fromUserId, toUserId, amount) {
try {
const result = await sequelize.transaction(async (t) => {
// 检查余额
const fromUser = await User.findByPk(fromUserId, { transaction: t });
if (fromUser.balance < amount) {
throw new Error('余额不足');
}

// 扣减转出账户
await User.decrement('balance', {
by: amount,
where: { id: fromUserId },
transaction: t
});

// 增加转入账户
await User.increment('balance', {
by: amount,
where: { id: toUserId },
transaction: t
});

// 记录转账日志
const log = await TransferLog.create({
fromUserId,
toUserId,
amount,
status: 'completed'
}, { transaction: t });

return log;
});

return { success: true, data: result };

} catch (error) {
console.error('转账失败:', error);
return { success: false, message: error.message };
}
}

推荐使用自动管理事务,因为它会自动处理提交和回滚,代码更简洁,也不容易出错。

事务隔离级别

1. 隔离级别详解

const { Transaction } = require('sequelize');

// 读未提交(最低级别,可能出现脏读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.READ_UNCOMMITTED
});

// 读已提交(防止脏读,但可能出现不可重复读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED
});

// 可重复读(防止脏读和不可重复读,但可能出现幻读)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
});

// 串行化(最高级别,防止所有并发问题)
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE
});

2. 实际应用场景

// 库存扣减场景:使用可重复读防止超卖
async function decreaseStock(productId, quantity) {
return await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
}, async (t) => {
// 查询当前库存
const product = await Product.findByPk(productId, {
transaction: t,
lock: true // 行锁
});

if (product.stock < quantity) {
throw new Error('库存不足');
}

// 扣减库存
await product.decrement('stock', {
by: quantity,
transaction: t
});

return product;
});
}

锁机制

1. 悲观锁

// 行锁:锁定特定记录
async function updateUserWithLock(userId, updateData) {
return await sequelize.transaction(async (t) => {
// 查询时加锁
const user = await User.findByPk(userId, {
transaction: t,
lock: true // 等同于 lock: t.LOCK.UPDATE
});

if (!user) {
throw new Error('用户不存在');
}

// 更新数据
await user.update(updateData, { transaction: t });

return user;
});
}

// 共享锁:允许其他事务读取,但不允许修改
async function readUserWithSharedLock(userId) {
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, {
transaction: t,
lock: t.LOCK.SHARE
});

return user;
});
}

2. 乐观锁

// 使用版本号实现乐观锁
const User = sequelize.define('User', {
name: DataTypes.STRING,
balance: DataTypes.DECIMAL(10, 2),
version: DataTypes.INTEGER // 版本号字段
}, {
version: true // 启用乐观锁
});

async function updateUserOptimistic(userId, updateData) {
try {
const user = await User.findByPk(userId);
await user.update(updateData);
return { success: true };

} catch (error) {
if (error.name === 'SequelizeOptimisticLockError') {
return {
success: false,
message: '数据已被其他用户修改,请刷新后重试'
};
}
throw error;
}
}

复杂事务场景

1. 电商下单流程

async function createOrder(userId, items, addressId, couponId) {
return await sequelize.transaction(async (t) => {
// 1. 验证用户
const user = await User.findByPk(userId, { transaction: t });
if (!user) {
throw new Error('用户不存在');
}

// 2. 验证商品和库存
let totalAmount = 0;
const orderItems = [];

for (const item of items) {
const product = await Product.findByPk(item.productId, {
transaction: t,
lock: true // 锁定商品记录
});

if (!product) {
throw new Error(`商品 \${item.productId} 不存在`);
}

if (product.stock < item.quantity) {
throw new Error(`商品 \${product.name} 库存不足`);
}

// 扣减库存
await product.decrement('stock', {
by: item.quantity,
transaction: t
});

const itemTotal = product.price * item.quantity;
totalAmount += itemTotal;

orderItems.push({
productId: item.productId,
productName: product.name,
price: product.price,
quantity: item.quantity,
subtotal: itemTotal
});
}

// 3. 处理优惠券
let discountAmount = 0;
if (couponId) {
const coupon = await Coupon.findByPk(couponId, {
transaction: t,
lock: true
});

if (!coupon || coupon.status !== 'active') {
throw new Error('优惠券无效');
}

if (coupon.minAmount && totalAmount < coupon.minAmount) {
throw new Error(`订单金额不满足优惠券使用条件`);
}

discountAmount = Math.min(coupon.discountAmount, totalAmount);

// 标记优惠券为已使用
await coupon.update({
status: 'used',
usedAt: new Date(),
usedBy: userId
}, { transaction: t });
}

const finalAmount = totalAmount - discountAmount;

// 4. 创建订单
const order = await Order.create({
userId,
addressId,
totalAmount,
discountAmount,
finalAmount,
status: 'pending',
orderNumber: generateOrderNumber()
}, { transaction: t });

// 5. 创建订单项
for (const item of orderItems) {
await OrderItem.create({
orderId: order.id,
...item
}, { transaction: t });
}

// 6. 记录库存变动日志
for (const item of items) {
await StockLog.create({
productId: item.productId,
type: 'decrease',
quantity: item.quantity,
reason: 'order_created',
relatedId: order.id
}, { transaction: t });
}

return order;
});
}

2. 用户积分系统

async function processPointsTransaction(userId, points, type, reason, relatedId) {
return await sequelize.transaction(async (t) => {
// 查询用户并锁定
const user = await User.findByPk(userId, {
transaction: t,
lock: true
});

if (!user) {
throw new Error('用户不存在');
}

// 检查积分余额(扣减时)
if (type === 'decrease' && user.points < points) {
throw new Error('积分余额不足');
}

// 更新用户积分
const newPoints = type === 'increase'
? user.points + points
: user.points - points;

await user.update({ points: newPoints }, { transaction: t });

// 记录积分变动日志
await PointsLog.create({
userId,
type,
points,
reason,
relatedId,
beforePoints: user.points,
afterPoints: newPoints
}, { transaction: t });

// 检查等级升级
const newLevel = calculateUserLevel(newPoints);
if (newLevel > user.level) {
await user.update({ level: newLevel }, { transaction: t });

// 发送升级奖励
await this.processPointsTransaction(
userId,
newLevel * 100,
'increase',
'level_up_bonus',
null,
t
);
}

return {
user: await user.reload({ transaction: t }),
pointsChange: type === 'increase' ? points : -points
};
});
}

事务性能优化

1. 减少事务时间

// 不好的做法:事务时间过长
async function badExample(userId, data) {
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, { transaction: t });

// 耗时的外部API调用(不应该在事务中)
const result = await callExternalAPI(data);

await user.update({ externalId: result.id }, { transaction: t });
});
}

// 好的做法:先处理外部操作,再开启事务
async function goodExample(userId, data) {
// 先调用外部API
const result = await callExternalAPI(data);

// 再开启事务进行数据库操作
return await sequelize.transaction(async (t) => {
const user = await User.findByPk(userId, { transaction: t });
await user.update({ externalId: result.id }, { transaction: t });
});
}

2. 批量操作优化

// 批量更新用户积分
async function batchUpdatePoints(userPointsMap) {
return await sequelize.transaction(async (t) => {
const userIds = Object.keys(userPointsMap);

// 批量查询用户
const users = await User.findAll({
where: { id: userIds },
transaction: t,
lock: true
});

const updates = [];
const logs = [];

for (const user of users) {
const pointsChange = userPointsMap[user.id];
const newPoints = user.points + pointsChange;

updates.push(
user.update({ points: newPoints }, { transaction: t })
);

logs.push({
userId: user.id,
type: pointsChange > 0 ? 'increase' : 'decrease',
points: Math.abs(pointsChange),
reason: 'batch_update',
beforePoints: user.points,
afterPoints: newPoints
});
}

// 并行执行更新
await Promise.all(updates);

// 批量创建日志
await PointsLog.bulkCreate(logs, { transaction: t });

return users.length;
});
}

错误处理和重试机制

1. 死锁处理

async function handleDeadlock(operation, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();

} catch (error) {
// 检查是否是死锁错误
if (error.name === 'SequelizeDatabaseError' &&
error.original?.code === 'ER_LOCK_DEADLOCK') {

if (attempt === maxRetries) {
throw new Error('操作失败:检测到死锁,已重试多次');
}

// 随机延迟后重试
const delay = Math.random() * 1000 * attempt;
await new Promise(resolve => setTimeout(resolve, delay));

console.log(`检测到死锁,第 \${attempt} 次重试...`);
continue;
}

throw error;
}
}
}

// 使用示例
async function transferWithDeadlockHandling(fromUserId, toUserId, amount) {
return await handleDeadlock(async () => {
return await transferMoneyAuto(fromUserId, toUserId, amount);
});
}

2. 事务超时处理

async function transactionWithTimeout(operation, timeoutMs = 30000) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('事务超时')), timeoutMs);
});

try {
return await Promise.race([
operation(),
timeoutPromise
]);
} catch (error) {
if (error.message === '事务超时') {
console.error('事务执行超时,可能存在性能问题');
}
throw error;
}
}

事务监控和调试

1. 事务日志

class TransactionLogger {
static async logTransaction(name, operation) {
const startTime = Date.now();
const transactionId = generateTransactionId();

console.log(`[\${transactionId}] 事务开始: \${name}`);

try {
const result = await operation();
const duration = Date.now() - startTime;

console.log(`[\${transactionId}] 事务成功: \${name}, 耗时: \${duration}ms`);

// 记录到监控系统
await this.recordMetrics(name, 'success', duration);

return result;

} catch (error) {
const duration = Date.now() - startTime;

console.error(`[\${transactionId}] 事务失败: \${name}, 耗时: \${duration}ms, 错误: \${error.message}`);

// 记录到监控系统
await this.recordMetrics(name, 'error', duration);

throw error;
}
}

static async recordMetrics(name, status, duration) {
// 发送到监控系统(如 Prometheus、DataDog 等)
// metrics.increment(`transaction.\${name}.\${status}`);
// metrics.histogram(`transaction.\${name}.duration`, duration);
}
}

// 使用示例
async function monitoredTransfer(fromUserId, toUserId, amount) {
return await TransactionLogger.logTransaction('user_transfer', async () => {
return await transferMoneyAuto(fromUserId, toUserId, amount);
});
}

2. 事务性能分析

// 分析长时间运行的事务
sequelize.addHook('beforeTransaction', (transaction) => {
transaction.startTime = Date.now();
});

sequelize.addHook('afterTransaction', (transaction) => {
const duration = Date.now() - transaction.startTime;

if (duration > 5000) { // 超过5秒的事务
console.warn(`长时间事务警告: 耗时 \${duration}ms`);
}
});

总结

今天我们深入学习了 Sequelize 的事务处理:

  • ✅ 事务的基本概念和ACID特性
  • ✅ 手动和自动管理事务的方法
  • ✅ 事务隔离级别和锁机制
  • ✅ 复杂业务场景的事务设计
  • ✅ 事务性能优化技巧
  • ✅ 错误处理和监控方案

掌握了这些知识,你就能够:

  • 保证复杂业务操作的数据一致性
  • 处理并发访问和竞态条件
  • 优化事务性能
  • 构建健壮的数据操作逻辑

下一篇文章,我们将学习 Sequelize 的迁移系统,这是管理数据库结构变更的重要工具。


相关文章推荐:

有问题欢迎留言讨论,我会及时回复大家!