Skip to content

Sequelize 事务处理实战 - 保证数据一致性的核心技术

发布时间:2024-03-07
作者:一介布衣
标签:Sequelize, 事务, 数据一致性, ACID

前言

今天咱们来学习 Sequelize 中一个非常重要的概念 - 事务(Transaction)。说实话,事务是数据库操作中保证数据一致性的核心机制,特别是在涉及多个表操作的时候,事务就显得尤为重要。

我记得刚开始做项目的时候,经常遇到这样的问题:用户下单时,需要同时更新订单表、库存表、用户积分表等多张表,如果中间某一步出错了,就会导致数据不一致。比如订单创建成功了,但是库存没有扣减,或者积分没有增加。后来学会了事务,这些问题就迎刃而解了。

今天我就把事务的概念、用法和最佳实践分享给大家。

事务基础概念

什么是事务?

事务是一组数据库操作的集合,这些操作要么全部成功,要么全部失败。事务具有四个重要特性(ACID):

  • 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部不完成
  • 一致性(Consistency):事务执行前后,数据库都处于一致状态
  • 隔离性(Isolation):并发执行的事务之间不会相互影响
  • 持久性(Durability):事务一旦提交,其结果就是永久性的

为什么需要事务?

javascript
// 没有事务的转账操作(危险!)
async function transferMoney(fromUserId, toUserId, amount) {
  // 第一步:从转出账户扣钱
  await User.decrement('balance', {
    by: amount,
    where: { id: fromUserId }
  });
  
  // 如果这里出错了,钱就丢了!
  throw new Error('网络错误');
  
  // 第二步:给转入账户加钱
  await User.increment('balance', {
    by: amount,
    where: { id: toUserId }
  });
}

上面的代码有个严重问题:如果第一步成功了,第二步失败了,钱就凭空消失了!

Sequelize 事务的基本用法

1. 手动管理事务

javascript
async function transferMoneyWithTransaction(fromUserId, toUserId, amount) {
  // 开始事务
  const transaction = await sequelize.transaction();
  
  try {
    // 检查余额
    const fromUser = await User.findByPk(fromUserId, { transaction });
    if (fromUser.balance < amount) {
      throw new Error('余额不足');
    }
    
    // 扣减转出账户
    await User.decrement('balance', {
      by: amount,
      where: { id: fromUserId },
      transaction
    });
    
    // 增加转入账户
    await User.increment('balance', {
      by: amount,
      where: { id: toUserId },
      transaction
    });
    
    // 记录转账日志
    await TransferLog.create({
      fromUserId,
      toUserId,
      amount,
      status: 'completed'
    }, { transaction });
    
    // 提交事务
    await transaction.commit();
    
    return { success: true, message: '转账成功' };
    
  } catch (error) {
    // 回滚事务
    await transaction.rollback();
    
    console.error('转账失败:', error);
    return { success: false, message: error.message };
  }
}

2. 自动管理事务

Sequelize 提供了更简洁的自动管理事务的方式:

javascript
async function transferMoneyAuto(fromUserId, toUserId, amount) {
  try {
    const result = await sequelize.transaction(async (t) => {
      // 检查余额
      const fromUser = await User.findByPk(fromUserId, { transaction: t });
      if (fromUser.balance < amount) {
        throw new Error('余额不足');
      }
      
      // 扣减转出账户
      await User.decrement('balance', {
        by: amount,
        where: { id: fromUserId },
        transaction: t
      });
      
      // 增加转入账户
      await User.increment('balance', {
        by: amount,
        where: { id: toUserId },
        transaction: t
      });
      
      // 记录转账日志
      const log = await TransferLog.create({
        fromUserId,
        toUserId,
        amount,
        status: 'completed'
      }, { transaction: t });
      
      return log;
    });
    
    return { success: true, data: result };
    
  } catch (error) {
    console.error('转账失败:', error);
    return { success: false, message: error.message };
  }
}

推荐使用自动管理事务,因为它会自动处理提交和回滚,代码更简洁,也不容易出错。

事务隔离级别

1. 隔离级别详解

javascript
const { Transaction } = require('sequelize');

// 读未提交(最低级别,可能出现脏读)
const transaction = await sequelize.transaction({
  isolationLevel: Transaction.ISOLATION_LEVELS.READ_UNCOMMITTED
});

// 读已提交(防止脏读,但可能出现不可重复读)
const transaction = await sequelize.transaction({
  isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED
});

// 可重复读(防止脏读和不可重复读,但可能出现幻读)
const transaction = await sequelize.transaction({
  isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
});

// 串行化(最高级别,防止所有并发问题)
const transaction = await sequelize.transaction({
  isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE
});

2. 实际应用场景

javascript
// 库存扣减场景:使用可重复读防止超卖
async function decreaseStock(productId, quantity) {
  return await sequelize.transaction({
    isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
  }, async (t) => {
    // 查询当前库存
    const product = await Product.findByPk(productId, {
      transaction: t,
      lock: true  // 行锁
    });
    
    if (product.stock < quantity) {
      throw new Error('库存不足');
    }
    
    // 扣减库存
    await product.decrement('stock', {
      by: quantity,
      transaction: t
    });
    
    return product;
  });
}

锁机制

1. 悲观锁

javascript
// 行锁:锁定特定记录
async function updateUserWithLock(userId, updateData) {
  return await sequelize.transaction(async (t) => {
    // 查询时加锁
    const user = await User.findByPk(userId, {
      transaction: t,
      lock: true  // 等同于 lock: t.LOCK.UPDATE
    });
    
    if (!user) {
      throw new Error('用户不存在');
    }
    
    // 更新数据
    await user.update(updateData, { transaction: t });
    
    return user;
  });
}

// 共享锁:允许其他事务读取,但不允许修改
async function readUserWithSharedLock(userId) {
  return await sequelize.transaction(async (t) => {
    const user = await User.findByPk(userId, {
      transaction: t,
      lock: t.LOCK.SHARE
    });
    
    return user;
  });
}

2. 乐观锁

javascript
// 使用版本号实现乐观锁
const User = sequelize.define('User', {
  name: DataTypes.STRING,
  balance: DataTypes.DECIMAL(10, 2),
  version: DataTypes.INTEGER  // 版本号字段
}, {
  version: true  // 启用乐观锁
});

async function updateUserOptimistic(userId, updateData) {
  try {
    const user = await User.findByPk(userId);
    await user.update(updateData);
    return { success: true };
    
  } catch (error) {
    if (error.name === 'SequelizeOptimisticLockError') {
      return { 
        success: false, 
        message: '数据已被其他用户修改,请刷新后重试' 
      };
    }
    throw error;
  }
}

复杂事务场景

1. 电商下单流程

javascript
async function createOrder(userId, items, addressId, couponId) {
  return await sequelize.transaction(async (t) => {
    // 1. 验证用户
    const user = await User.findByPk(userId, { transaction: t });
    if (!user) {
      throw new Error('用户不存在');
    }
    
    // 2. 验证商品和库存
    let totalAmount = 0;
    const orderItems = [];
    
    for (const item of items) {
      const product = await Product.findByPk(item.productId, {
        transaction: t,
        lock: true  // 锁定商品记录
      });
      
      if (!product) {
        throw new Error(`商品 \${item.productId} 不存在`);
      }
      
      if (product.stock < item.quantity) {
        throw new Error(`商品 \${product.name} 库存不足`);
      }
      
      // 扣减库存
      await product.decrement('stock', {
        by: item.quantity,
        transaction: t
      });
      
      const itemTotal = product.price * item.quantity;
      totalAmount += itemTotal;
      
      orderItems.push({
        productId: item.productId,
        productName: product.name,
        price: product.price,
        quantity: item.quantity,
        subtotal: itemTotal
      });
    }
    
    // 3. 处理优惠券
    let discountAmount = 0;
    if (couponId) {
      const coupon = await Coupon.findByPk(couponId, {
        transaction: t,
        lock: true
      });
      
      if (!coupon || coupon.status !== 'active') {
        throw new Error('优惠券无效');
      }
      
      if (coupon.minAmount && totalAmount < coupon.minAmount) {
        throw new Error(`订单金额不满足优惠券使用条件`);
      }
      
      discountAmount = Math.min(coupon.discountAmount, totalAmount);
      
      // 标记优惠券为已使用
      await coupon.update({
        status: 'used',
        usedAt: new Date(),
        usedBy: userId
      }, { transaction: t });
    }
    
    const finalAmount = totalAmount - discountAmount;
    
    // 4. 创建订单
    const order = await Order.create({
      userId,
      addressId,
      totalAmount,
      discountAmount,
      finalAmount,
      status: 'pending',
      orderNumber: generateOrderNumber()
    }, { transaction: t });
    
    // 5. 创建订单项
    for (const item of orderItems) {
      await OrderItem.create({
        orderId: order.id,
        ...item
      }, { transaction: t });
    }
    
    // 6. 记录库存变动日志
    for (const item of items) {
      await StockLog.create({
        productId: item.productId,
        type: 'decrease',
        quantity: item.quantity,
        reason: 'order_created',
        relatedId: order.id
      }, { transaction: t });
    }
    
    return order;
  });
}

2. 用户积分系统

javascript
async function processPointsTransaction(userId, points, type, reason, relatedId) {
  return await sequelize.transaction(async (t) => {
    // 查询用户并锁定
    const user = await User.findByPk(userId, {
      transaction: t,
      lock: true
    });
    
    if (!user) {
      throw new Error('用户不存在');
    }
    
    // 检查积分余额(扣减时)
    if (type === 'decrease' && user.points < points) {
      throw new Error('积分余额不足');
    }
    
    // 更新用户积分
    const newPoints = type === 'increase' 
      ? user.points + points 
      : user.points - points;
      
    await user.update({ points: newPoints }, { transaction: t });
    
    // 记录积分变动日志
    await PointsLog.create({
      userId,
      type,
      points,
      reason,
      relatedId,
      beforePoints: user.points,
      afterPoints: newPoints
    }, { transaction: t });
    
    // 检查等级升级
    const newLevel = calculateUserLevel(newPoints);
    if (newLevel > user.level) {
      await user.update({ level: newLevel }, { transaction: t });
      
      // 发送升级奖励
      await this.processPointsTransaction(
        userId, 
        newLevel * 100, 
        'increase', 
        'level_up_bonus',
        null,
        t
      );
    }
    
    return {
      user: await user.reload({ transaction: t }),
      pointsChange: type === 'increase' ? points : -points
    };
  });
}

事务性能优化

1. 减少事务时间

javascript
// 不好的做法:事务时间过长
async function badExample(userId, data) {
  return await sequelize.transaction(async (t) => {
    const user = await User.findByPk(userId, { transaction: t });
    
    // 耗时的外部API调用(不应该在事务中)
    const result = await callExternalAPI(data);
    
    await user.update({ externalId: result.id }, { transaction: t });
  });
}

// 好的做法:先处理外部操作,再开启事务
async function goodExample(userId, data) {
  // 先调用外部API
  const result = await callExternalAPI(data);
  
  // 再开启事务进行数据库操作
  return await sequelize.transaction(async (t) => {
    const user = await User.findByPk(userId, { transaction: t });
    await user.update({ externalId: result.id }, { transaction: t });
  });
}

2. 批量操作优化

javascript
// 批量更新用户积分
async function batchUpdatePoints(userPointsMap) {
  return await sequelize.transaction(async (t) => {
    const userIds = Object.keys(userPointsMap);
    
    // 批量查询用户
    const users = await User.findAll({
      where: { id: userIds },
      transaction: t,
      lock: true
    });
    
    const updates = [];
    const logs = [];
    
    for (const user of users) {
      const pointsChange = userPointsMap[user.id];
      const newPoints = user.points + pointsChange;
      
      updates.push(
        user.update({ points: newPoints }, { transaction: t })
      );
      
      logs.push({
        userId: user.id,
        type: pointsChange > 0 ? 'increase' : 'decrease',
        points: Math.abs(pointsChange),
        reason: 'batch_update',
        beforePoints: user.points,
        afterPoints: newPoints
      });
    }
    
    // 并行执行更新
    await Promise.all(updates);
    
    // 批量创建日志
    await PointsLog.bulkCreate(logs, { transaction: t });
    
    return users.length;
  });
}

错误处理和重试机制

1. 死锁处理

javascript
async function handleDeadlock(operation, maxRetries = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
      
    } catch (error) {
      // 检查是否是死锁错误
      if (error.name === 'SequelizeDatabaseError' && 
          error.original?.code === 'ER_LOCK_DEADLOCK') {
        
        if (attempt === maxRetries) {
          throw new Error('操作失败:检测到死锁,已重试多次');
        }
        
        // 随机延迟后重试
        const delay = Math.random() * 1000 * attempt;
        await new Promise(resolve => setTimeout(resolve, delay));
        
        console.log(`检测到死锁,第 \${attempt} 次重试...`);
        continue;
      }
      
      throw error;
    }
  }
}

// 使用示例
async function transferWithDeadlockHandling(fromUserId, toUserId, amount) {
  return await handleDeadlock(async () => {
    return await transferMoneyAuto(fromUserId, toUserId, amount);
  });
}

2. 事务超时处理

javascript
async function transactionWithTimeout(operation, timeoutMs = 30000) {
  const timeoutPromise = new Promise((_, reject) => {
    setTimeout(() => reject(new Error('事务超时')), timeoutMs);
  });
  
  try {
    return await Promise.race([
      operation(),
      timeoutPromise
    ]);
  } catch (error) {
    if (error.message === '事务超时') {
      console.error('事务执行超时,可能存在性能问题');
    }
    throw error;
  }
}

事务监控和调试

1. 事务日志

javascript
class TransactionLogger {
  static async logTransaction(name, operation) {
    const startTime = Date.now();
    const transactionId = generateTransactionId();
    
    console.log(`[\${transactionId}] 事务开始: \${name}`);
    
    try {
      const result = await operation();
      const duration = Date.now() - startTime;
      
      console.log(`[\${transactionId}] 事务成功: \${name}, 耗时: \${duration}ms`);
      
      // 记录到监控系统
      await this.recordMetrics(name, 'success', duration);
      
      return result;
      
    } catch (error) {
      const duration = Date.now() - startTime;
      
      console.error(`[\${transactionId}] 事务失败: \${name}, 耗时: \${duration}ms, 错误: \${error.message}`);
      
      // 记录到监控系统
      await this.recordMetrics(name, 'error', duration);
      
      throw error;
    }
  }
  
  static async recordMetrics(name, status, duration) {
    // 发送到监控系统(如 Prometheus、DataDog 等)
    // metrics.increment(`transaction.\${name}.\${status}`);
    // metrics.histogram(`transaction.\${name}.duration`, duration);
  }
}

// 使用示例
async function monitoredTransfer(fromUserId, toUserId, amount) {
  return await TransactionLogger.logTransaction('user_transfer', async () => {
    return await transferMoneyAuto(fromUserId, toUserId, amount);
  });
}

2. 事务性能分析

javascript
// 分析长时间运行的事务
sequelize.addHook('beforeTransaction', (transaction) => {
  transaction.startTime = Date.now();
});

sequelize.addHook('afterTransaction', (transaction) => {
  const duration = Date.now() - transaction.startTime;
  
  if (duration > 5000) {  // 超过5秒的事务
    console.warn(`长时间事务警告: 耗时 \${duration}ms`);
  }
});

总结

今天我们深入学习了 Sequelize 的事务处理:

  • ✅ 事务的基本概念和ACID特性
  • ✅ 手动和自动管理事务的方法
  • ✅ 事务隔离级别和锁机制
  • ✅ 复杂业务场景的事务设计
  • ✅ 事务性能优化技巧
  • ✅ 错误处理和监控方案

掌握了这些知识,你就能够:

  • 保证复杂业务操作的数据一致性
  • 处理并发访问和竞态条件
  • 优化事务性能
  • 构建健壮的数据操作逻辑

下一篇文章,我们将学习 Sequelize 的迁移系统,这是管理数据库结构变更的重要工具。


相关文章推荐:

有问题欢迎留言讨论,我会及时回复大家!