Skip to content

Sequelize 原生 SQL 查询详解 - 复杂查询的终极解决方案

发布时间:2024-04-01
作者:一介布衣
标签:Sequelize, 原生SQL, 复杂查询, 性能优化

前言

今天咱们来学习 Sequelize 的原生 SQL 查询功能。说实话,虽然 Sequelize 的 ORM 功能很强大,但有时候遇到特别复杂的查询需求,或者需要使用数据库特有的功能时,原生 SQL 就是最好的选择。

我记得有一次做数据分析项目,需要写一个包含多层子查询、窗口函数、复杂聚合的 SQL。用 Sequelize 的 ORM 方式写出来的代码又长又难懂,最后还是直接写原生 SQL 解决的。后来发现,Sequelize 的原生 SQL 功能其实很强大,既能享受 SQL 的灵活性,又能保持与 ORM 的集成。

今天我就把原生 SQL 查询的各种用法和技巧分享给大家。

基础原生查询

1. 简单查询

javascript
const { QueryTypes } = require('sequelize');

// 基本查询
const users = await sequelize.query(
  'SELECT * FROM users WHERE status = ?',
  {
    replacements: ['active'],
    type: QueryTypes.SELECT
  }
);

console.log(users);
// [{ id: 1, username: 'john', email: 'john@example.com', ... }]

// 不指定类型,返回 [results, metadata]
const [results, metadata] = await sequelize.query(
  'SELECT * FROM users WHERE status = ?',
  { replacements: ['active'] }
);

2. 查询类型

javascript
// SELECT 查询
const users = await sequelize.query(
  'SELECT * FROM users',
  { type: QueryTypes.SELECT }
);

// INSERT 查询
const [results, metadata] = await sequelize.query(
  'INSERT INTO users (username, email) VALUES (?, ?)',
  {
    replacements: ['newuser', 'newuser@example.com'],
    type: QueryTypes.INSERT
  }
);

// UPDATE 查询
const [results, metadata] = await sequelize.query(
  'UPDATE users SET last_login_at = NOW() WHERE id = ?',
  {
    replacements: [1],
    type: QueryTypes.UPDATE
  }
);

// DELETE 查询
const [results, metadata] = await sequelize.query(
  'DELETE FROM users WHERE status = ?',
  {
    replacements: ['inactive'],
    type: QueryTypes.DELETE
  }
);

// 原生查询(返回数据库原始结果)
const result = await sequelize.query(
  'SHOW TABLES',
  { type: QueryTypes.RAW }
);

参数绑定和安全性

1. 位置参数绑定

javascript
// 使用 ? 占位符
const users = await sequelize.query(
  'SELECT * FROM users WHERE age > ? AND city = ?',
  {
    replacements: [18, 'Beijing'],
    type: QueryTypes.SELECT
  }
);

// 数组参数
const userIds = [1, 2, 3, 4, 5];
const users = await sequelize.query(
  'SELECT * FROM users WHERE id IN (?)',
  {
    replacements: [userIds],
    type: QueryTypes.SELECT
  }
);

2. 命名参数绑定

javascript
// 使用命名参数(推荐)
const users = await sequelize.query(
  'SELECT * FROM users WHERE age > :minAge AND city = :city',
  {
    replacements: {
      minAge: 18,
      city: 'Beijing'
    },
    type: QueryTypes.SELECT
  }
);

// 复杂查询示例
const orderStats = await sequelize.query(`
  SELECT 
    u.username,
    COUNT(o.id) as order_count,
    SUM(o.total_amount) as total_spent
  FROM users u
  LEFT JOIN orders o ON u.id = o.user_id
  WHERE u.created_at >= :startDate
    AND u.created_at <= :endDate
    AND u.status = :status
  GROUP BY u.id, u.username
  HAVING COUNT(o.id) > :minOrders
  ORDER BY total_spent DESC
  LIMIT :limit
`, {
  replacements: {
    startDate: '2024-01-01',
    endDate: '2024-12-31',
    status: 'active',
    minOrders: 5,
    limit: 10
  },
  type: QueryTypes.SELECT
});

3. 防止 SQL 注入

javascript
// ❌ 危险:直接拼接字符串
const badQuery = `SELECT * FROM users WHERE username = '${username}'`;

// ✅ 安全:使用参数绑定
const safeQuery = await sequelize.query(
  'SELECT * FROM users WHERE username = :username',
  {
    replacements: { username },
    type: QueryTypes.SELECT
  }
);

// ✅ 安全:使用 bind 参数
const bindQuery = await sequelize.query(
  'SELECT * FROM users WHERE username = $username',
  {
    bind: { username },
    type: QueryTypes.SELECT
  }
);

复杂查询场景

1. 窗口函数查询

javascript
// 用户排名查询
const userRankings = await sequelize.query(`
  SELECT 
    username,
    score,
    ROW_NUMBER() OVER (ORDER BY score DESC) as rank,
    RANK() OVER (ORDER BY score DESC) as dense_rank,
    PERCENT_RANK() OVER (ORDER BY score DESC) as percent_rank
  FROM users
  WHERE status = 'active'
  ORDER BY score DESC
`, {
  type: QueryTypes.SELECT
});

// 分组排名
const topUsersByCategory = await sequelize.query(`
  SELECT 
    category,
    username,
    score,
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) as category_rank
  FROM users u
  JOIN user_categories uc ON u.id = uc.user_id
  JOIN categories c ON uc.category_id = c.id
  WHERE u.status = 'active'
  QUALIFY ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) <= 3
`, {
  type: QueryTypes.SELECT
});

2. 递归查询(CTE)

javascript
// 组织架构递归查询
const orgHierarchy = await sequelize.query(`
  WITH RECURSIVE org_tree AS (
    -- 根节点
    SELECT 
      id, 
      name, 
      parent_id, 
      0 as level,
      CAST(name AS VARCHAR(1000)) as path
    FROM departments 
    WHERE parent_id IS NULL
    
    UNION ALL
    
    -- 递归部分
    SELECT 
      d.id, 
      d.name, 
      d.parent_id, 
      ot.level + 1,
      CONCAT(ot.path, ' > ', d.name)
    FROM departments d
    JOIN org_tree ot ON d.parent_id = ot.id
  )
  SELECT * FROM org_tree
  ORDER BY level, name
`, {
  type: QueryTypes.SELECT
});

3. 复杂聚合查询

javascript
// 销售数据分析
const salesAnalysis = await sequelize.query(`
  SELECT 
    DATE_FORMAT(o.created_at, '%Y-%m') as month,
    COUNT(DISTINCT o.id) as order_count,
    COUNT(DISTINCT o.user_id) as unique_customers,
    SUM(o.total_amount) as total_revenue,
    AVG(o.total_amount) as avg_order_value,
    
    -- 同比增长
    LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) as prev_month_revenue,
    
    -- 计算增长率
    CASE 
      WHEN LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) > 0 
      THEN ROUND(
        (SUM(o.total_amount) - LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m'))) 
        / LAG(SUM(o.total_amount)) OVER (ORDER BY DATE_FORMAT(o.created_at, '%Y-%m')) * 100, 2
      )
      ELSE NULL 
    END as growth_rate
    
  FROM orders o
  WHERE o.status = 'completed'
    AND o.created_at >= :startDate
    AND o.created_at <= :endDate
  GROUP BY DATE_FORMAT(o.created_at, '%Y-%m')
  ORDER BY month
`, {
  replacements: {
    startDate: '2023-01-01',
    endDate: '2024-12-31'
  },
  type: QueryTypes.SELECT
});

事务中的原生查询

1. 在事务中执行原生查询

javascript
const transaction = await sequelize.transaction();

try {
  // 在事务中执行原生查询
  const [results] = await sequelize.query(
    'UPDATE users SET balance = balance - :amount WHERE id = :userId',
    {
      replacements: { amount: 100, userId: 1 },
      type: QueryTypes.UPDATE,
      transaction  // 传入事务
    }
  );

  await sequelize.query(
    'INSERT INTO transactions (user_id, amount, type) VALUES (:userId, :amount, :type)',
    {
      replacements: { userId: 1, amount: 100, type: 'debit' },
      type: QueryTypes.INSERT,
      transaction
    }
  );

  await transaction.commit();
} catch (error) {
  await transaction.rollback();
  throw error;
}

2. 批量操作

javascript
async function batchUpdateUsers(updates) {
  const transaction = await sequelize.transaction();
  
  try {
    // 构建批量更新 SQL
    const cases = updates.map((update, index) => 
      `WHEN id = :id${index} THEN :value${index}`
    ).join(' ');
    
    const replacements = {};
    const ids = [];
    
    updates.forEach((update, index) => {
      replacements[`id${index}`] = update.id;
      replacements[`value${index}`] = update.value;
      ids.push(update.id);
    });
    
    await sequelize.query(`
      UPDATE users 
      SET score = CASE ${cases} END
      WHERE id IN (${ids.map((_, i) => `:id${i}`).join(',')})
    `, {
      replacements,
      type: QueryTypes.UPDATE,
      transaction
    });
    
    await transaction.commit();
  } catch (error) {
    await transaction.rollback();
    throw error;
  }
}

数据库特定功能

1. MySQL 特定功能

javascript
// 全文搜索
const searchResults = await sequelize.query(`
  SELECT 
    *,
    MATCH(title, content) AGAINST(:keyword IN NATURAL LANGUAGE MODE) as relevance
  FROM articles
  WHERE MATCH(title, content) AGAINST(:keyword IN NATURAL LANGUAGE MODE)
  ORDER BY relevance DESC
`, {
  replacements: { keyword: '搜索关键词' },
  type: QueryTypes.SELECT
});

// JSON 字段查询
const users = await sequelize.query(`
  SELECT 
    id,
    username,
    JSON_EXTRACT(metadata, '$.preferences.theme') as theme,
    JSON_EXTRACT(metadata, '$.profile.age') as age
  FROM users
  WHERE JSON_EXTRACT(metadata, '$.profile.city') = :city
`, {
  replacements: { city: 'Beijing' },
  type: QueryTypes.SELECT
});

// 地理位置查询
const nearbyLocations = await sequelize.query(`
  SELECT 
    *,
    ST_Distance_Sphere(
      POINT(longitude, latitude),
      POINT(:userLng, :userLat)
    ) as distance
  FROM locations
  WHERE ST_Distance_Sphere(
    POINT(longitude, latitude),
    POINT(:userLng, :userLat)
  ) <= :radius
  ORDER BY distance
`, {
  replacements: {
    userLng: 116.404,
    userLat: 39.915,
    radius: 5000  // 5公里
  },
  type: QueryTypes.SELECT
});

2. PostgreSQL 特定功能

javascript
// 数组操作
const users = await sequelize.query(`
  SELECT *
  FROM users
  WHERE :tag = ANY(tags)
    AND array_length(skills, 1) > :minSkills
`, {
  replacements: {
    tag: 'javascript',
    minSkills: 3
  },
  type: QueryTypes.SELECT
});

// JSONB 查询
const products = await sequelize.query(`
  SELECT 
    id,
    name,
    attributes->>'color' as color,
    attributes->'specs'->>'weight' as weight
  FROM products
  WHERE attributes @> :filter
    AND attributes ? :key
`, {
  replacements: {
    filter: JSON.stringify({ category: 'electronics' }),
    key: 'warranty'
  },
  type: QueryTypes.SELECT
});

性能优化

1. 查询计划分析

javascript
// MySQL 查询计划
const explainResult = await sequelize.query(`
  EXPLAIN FORMAT=JSON
  SELECT u.*, COUNT(o.id) as order_count
  FROM users u
  LEFT JOIN orders o ON u.id = o.user_id
  WHERE u.status = 'active'
  GROUP BY u.id
`, {
  type: QueryTypes.SELECT
});

console.log('查询计划:', JSON.stringify(explainResult[0], null, 2));

// PostgreSQL 查询计划
const pgExplain = await sequelize.query(`
  EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
  SELECT u.*, COUNT(o.id) as order_count
  FROM users u
  LEFT JOIN orders o ON u.id = o.user_id
  WHERE u.status = 'active'
  GROUP BY u.id
`, {
  type: QueryTypes.SELECT
});

2. 索引提示

javascript
// MySQL 索引提示
const users = await sequelize.query(`
  SELECT /*+ USE_INDEX(users, idx_status_created) */ *
  FROM users
  WHERE status = 'active'
    AND created_at > :date
  ORDER BY created_at DESC
`, {
  replacements: { date: '2024-01-01' },
  type: QueryTypes.SELECT
});

// 强制使用索引
const forceIndexQuery = await sequelize.query(`
  SELECT *
  FROM users FORCE INDEX (idx_email)
  WHERE email LIKE :pattern
`, {
  replacements: { pattern: '%@gmail.com' },
  type: QueryTypes.SELECT
});

结果处理和映射

1. 结果映射到模型

javascript
// 映射到模型实例
const users = await sequelize.query(
  'SELECT * FROM users WHERE status = :status',
  {
    replacements: { status: 'active' },
    type: QueryTypes.SELECT,
    model: User,  // 映射到 User 模型
    mapToModel: true  // 创建模型实例
  }
);

// 现在 users 是 User 实例数组,可以调用模型方法
users.forEach(user => {
  console.log(user.getFullName());  // 调用实例方法
});

2. 自定义结果处理

javascript
// 结果转换
const processedResults = await sequelize.query(`
  SELECT 
    id,
    username,
    email,
    created_at,
    DATEDIFF(NOW(), created_at) as days_since_registration
  FROM users
  WHERE status = 'active'
`, {
  type: QueryTypes.SELECT
}).then(results => {
  return results.map(user => ({
    ...user,
    isNewUser: user.days_since_registration <= 30,
    registrationCategory: user.days_since_registration <= 7 ? 'new' :
                         user.days_since_registration <= 30 ? 'recent' : 'established'
  }));
});

实战案例:数据报表系统

javascript
class ReportService {
  // 用户活跃度报表
  static async getUserActivityReport(startDate, endDate) {
    return await sequelize.query(`
      WITH daily_activity AS (
        SELECT 
          DATE(created_at) as activity_date,
          user_id,
          COUNT(*) as action_count
        FROM user_activities
        WHERE created_at BETWEEN :startDate AND :endDate
        GROUP BY DATE(created_at), user_id
      ),
      user_stats AS (
        SELECT 
          u.id,
          u.username,
          u.email,
          COUNT(DISTINCT da.activity_date) as active_days,
          SUM(da.action_count) as total_actions,
          AVG(da.action_count) as avg_daily_actions,
          MAX(da.action_count) as max_daily_actions
        FROM users u
        LEFT JOIN daily_activity da ON u.id = da.user_id
        WHERE u.status = 'active'
        GROUP BY u.id, u.username, u.email
      )
      SELECT 
        *,
        CASE 
          WHEN active_days >= 25 THEN 'highly_active'
          WHEN active_days >= 15 THEN 'moderately_active'
          WHEN active_days >= 5 THEN 'low_active'
          ELSE 'inactive'
        END as activity_level,
        ROUND(active_days * 100.0 / DATEDIFF(:endDate, :startDate), 2) as activity_percentage
      FROM user_stats
      ORDER BY total_actions DESC
    `, {
      replacements: { startDate, endDate },
      type: QueryTypes.SELECT
    });
  }

  // 销售漏斗分析
  static async getSalesFunnelReport() {
    return await sequelize.query(`
      WITH funnel_data AS (
        SELECT 
          'page_view' as stage,
          1 as stage_order,
          COUNT(DISTINCT session_id) as user_count
        FROM page_views
        WHERE page_type = 'product'
        
        UNION ALL
        
        SELECT 
          'add_to_cart' as stage,
          2 as stage_order,
          COUNT(DISTINCT user_id) as user_count
        FROM cart_items
        
        UNION ALL
        
        SELECT 
          'checkout_start' as stage,
          3 as stage_order,
          COUNT(DISTINCT user_id) as user_count
        FROM orders
        WHERE status != 'cart'
        
        UNION ALL
        
        SELECT 
          'payment_complete' as stage,
          4 as stage_order,
          COUNT(DISTINCT user_id) as user_count
        FROM orders
        WHERE status IN ('paid', 'completed')
      )
      SELECT 
        stage,
        stage_order,
        user_count,
        LAG(user_count) OVER (ORDER BY stage_order) as prev_stage_count,
        CASE 
          WHEN LAG(user_count) OVER (ORDER BY stage_order) > 0 
          THEN ROUND(user_count * 100.0 / LAG(user_count) OVER (ORDER BY stage_order), 2)
          ELSE 100.0
        END as conversion_rate,
        CASE 
          WHEN LAG(user_count) OVER (ORDER BY stage_order) > 0 
          THEN LAG(user_count) OVER (ORDER BY stage_order) - user_count
          ELSE 0
        END as drop_off_count
      FROM funnel_data
      ORDER BY stage_order
    `, {
      type: QueryTypes.SELECT
    });
  }

  // 商品推荐算法
  static async getProductRecommendations(userId, limit = 10) {
    return await sequelize.query(`
      WITH user_preferences AS (
        -- 用户购买历史
        SELECT 
          p.category_id,
          COUNT(*) as purchase_count,
          AVG(oi.price) as avg_price
        FROM order_items oi
        JOIN orders o ON oi.order_id = o.id
        JOIN products p ON oi.product_id = p.id
        WHERE o.user_id = :userId
          AND o.status = 'completed'
        GROUP BY p.category_id
      ),
      similar_users AS (
        -- 找到相似用户
        SELECT 
          o2.user_id,
          COUNT(DISTINCT p.category_id) as common_categories
        FROM orders o1
        JOIN order_items oi1 ON o1.id = oi1.order_id
        JOIN products p ON oi1.product_id = p.id
        JOIN order_items oi2 ON p.category_id = (
          SELECT p2.category_id 
          FROM products p2 
          WHERE p2.id = oi2.product_id
        )
        JOIN orders o2 ON oi2.order_id = o2.id
        WHERE o1.user_id = :userId
          AND o2.user_id != :userId
          AND o1.status = 'completed'
          AND o2.status = 'completed'
        GROUP BY o2.user_id
        HAVING common_categories >= 2
        ORDER BY common_categories DESC
        LIMIT 50
      ),
      recommended_products AS (
        SELECT 
          p.id,
          p.name,
          p.price,
          p.category_id,
          COUNT(DISTINCT su.user_id) as recommendation_score,
          AVG(pr.rating) as avg_rating
        FROM similar_users su
        JOIN orders o ON su.user_id = o.user_id
        JOIN order_items oi ON o.id = oi.order_id
        JOIN products p ON oi.product_id = p.id
        LEFT JOIN product_reviews pr ON p.id = pr.product_id
        WHERE o.status = 'completed'
          AND p.id NOT IN (
            -- 排除用户已购买的商品
            SELECT DISTINCT oi2.product_id
            FROM orders o2
            JOIN order_items oi2 ON o2.id = oi2.order_id
            WHERE o2.user_id = :userId
              AND o2.status = 'completed'
          )
        GROUP BY p.id, p.name, p.price, p.category_id
        HAVING AVG(pr.rating) >= 4.0 OR AVG(pr.rating) IS NULL
      )
      SELECT 
        rp.*,
        CASE 
          WHEN up.category_id IS NOT NULL THEN rp.recommendation_score * 2
          ELSE rp.recommendation_score
        END as final_score
      FROM recommended_products rp
      LEFT JOIN user_preferences up ON rp.category_id = up.category_id
      ORDER BY final_score DESC, avg_rating DESC
      LIMIT :limit
    `, {
      replacements: { userId, limit },
      type: QueryTypes.SELECT
    });
  }
}

总结

今天我们深入学习了 Sequelize 的原生 SQL 查询:

  • ✅ 基础原生查询语法和参数绑定
  • ✅ 复杂查询场景的实现方法
  • ✅ 事务中的原生查询处理
  • ✅ 数据库特定功能的使用
  • ✅ 性能优化和查询分析
  • ✅ 结果处理和映射技巧
  • ✅ 完整的数据报表系统案例

掌握了这些知识,你就能够:

  • 处理 ORM 无法满足的复杂查询需求
  • 充分利用数据库的特有功能
  • 优化查询性能
  • 构建复杂的数据分析系统

原生 SQL 查询是 Sequelize 的重要补充,合理使用能让你的应用更加强大和灵活!


相关文章推荐:

有问题欢迎留言讨论,我会及时回复大家!