Skip to content

Feathers.js 实时功能实战 - WebSocket 和事件系统

发布时间:2024-06-03
作者:一介布衣
标签:Feathers.js, WebSocket, 实时通信, 事件系统

前言

上一篇文章我们学习了认证与授权系统,今天咱们来重点学习 Feathers.js 最引以为傲的功能 - 实时通信。说实话,实时功能是 Feathers.js 的杀手锏,它让构建实时应用变得异常简单。

我记得以前用 Socket.io 做实时功能的时候,总是要写很多样板代码:监听连接、处理断线重连、同步数据状态等等。而且前端和后端的事件管理经常搞得一团糟。后来用了 Feathers.js 才发现,原来实时功能可以这么优雅 - 数据一变化,相关客户端自动收到通知,完全不需要手动处理。

今天我就带大家深入学习 Feathers.js 的实时系统,从基础概念到高级应用,让你的应用真正"活"起来。

实时系统架构

Feathers.js 实时通信原理

javascript
// 实时数据流
Service Method Call (create/update/patch/remove)

Service Execution

After Hooks

Event Generation (created/updated/patched/removed)

Channel System (决定发送给谁)

WebSocket/Socket.io (传输层)

Client Event Handlers (前端处理)

核心组件

  1. Events(事件) - 数据变化时自动生成
  2. Channels(频道) - 决定事件发送给哪些客户端
  3. Transports(传输层) - WebSocket、Socket.io 等
  4. Client(客户端) - 接收和处理实时事件

基础实时配置

1. 服务器端配置

javascript
// src/app.js
const feathers = require('@feathersjs/feathers');
const express = require('@feathersjs/express');
const socketio = require('@feathersjs/socketio');

const app = express(feathers());

// 配置 Socket.io
app.configure(socketio({
  cors: {
    origin: "http://localhost:3000",
    methods: ["GET", "POST"]
  },
  // Socket.io 配置选项
  pingTimeout: 60000,
  pingInterval: 25000,
  upgradeTimeout: 30000,
  maxHttpBufferSize: 1e6,
  allowEIO3: true
}));

// 或者配置原生 WebSocket
// app.configure(require('@feathersjs/socketio'));

module.exports = app;

2. 频道系统配置

javascript
// src/channels.js
module.exports = function(app) {
  if(typeof app.channel !== 'function') {
    // 如果没有实时传输配置,直接返回
    return;
  }

  app.on('connection', connection => {
    // 新连接加入匿名频道
    app.channel('anonymous').join(connection);
    console.log('新连接建立:', connection.id);
  });

  app.on('disconnect', connection => {
    console.log('连接断开:', connection.id);
  });

  app.on('login', (authResult, { connection }) => {
    const { user } = authResult;

    if(connection) {
      // 认证成功后的频道管理
      app.channel('anonymous').leave(connection);
      app.channel('authenticated').join(connection);
      
      // 用户专属频道
      app.channel(`user-\${user._id}`).join(connection);
      
      // 角色频道
      app.channel(`role-\${user.role}`).join(connection);
      
      console.log(`用户 \${user.username} 已登录`);
    }
  });

  app.on('logout', (authResult, { connection }) => {
    if(connection) {
      // 登出时移回匿名频道
      app.channel('authenticated').leave(connection);
      app.channel('anonymous').join(connection);
    }
  });

  // 全局事件发布策略
  app.publish((data, hook) => {
    // 默认发送给所有认证用户
    return app.channel('authenticated');
  });
};

3. 客户端配置

javascript
// client/feathers-client.js
import feathers from '@feathersjs/feathers';
import socketio from '@feathersjs/socketio-client';
import auth from '@feathersjs/authentication-client';
import io from 'socket.io-client';

// 建立连接
const socket = io('http://localhost:3030');
const client = feathers();

// 配置 Socket.io 客户端
client.configure(socketio(socket));

// 配置认证
client.configure(auth({
  storage: window.localStorage
}));

// 连接事件监听
socket.on('connect', () => {
  console.log('已连接到服务器');
});

socket.on('disconnect', () => {
  console.log('与服务器断开连接');
});

socket.on('reconnect', () => {
  console.log('重新连接成功');
});

export default client;

实时事件系统

1. 自动事件生成

javascript
// Feathers.js 自动生成的事件
const messagesService = app.service('messages');

// 当调用 create 方法时,自动发送 'created' 事件
await messagesService.create({
  text: 'Hello World',
  userId: user.id
});
// → 触发 'created' 事件

// 当调用 patch 方法时,自动发送 'patched' 事件
await messagesService.patch(messageId, {
  text: 'Updated message'
});
// → 触发 'patched' 事件

// 当调用 remove 方法时,自动发送 'removed' 事件
await messagesService.remove(messageId);
// → 触发 'removed' 事件

2. 客户端事件监听

javascript
// client/message-handler.js
import client from './feathers-client';

const messagesService = client.service('messages');

// 监听新消息
messagesService.on('created', (message) => {
  console.log('新消息:', message);
  addMessageToUI(message);
});

// 监听消息更新
messagesService.on('patched', (message) => {
  console.log('消息更新:', message);
  updateMessageInUI(message);
});

// 监听消息删除
messagesService.on('removed', (message) => {
  console.log('消息删除:', message);
  removeMessageFromUI(message);
});

// 监听所有事件
messagesService.on('*', (eventName, data) => {
  console.log(`消息事件 \${eventName}:`, data);
});

// 取消监听
const handler = (message) => console.log(message);
messagesService.on('created', handler);
messagesService.off('created', handler);  // 取消监听

3. 自定义事件

javascript
// 服务器端发送自定义事件
app.service('messages').create = async function(data, params) {
  const message = await this._super(data, params);
  
  // 发送自定义事件
  this.emit('message-sent', {
    message,
    sender: params.user,
    timestamp: new Date()
  });
  
  return message;
};

// 客户端监听自定义事件
messagesService.on('message-sent', (data) => {
  console.log('消息发送事件:', data);
  showNotification(`\${data.sender.username} 发送了新消息`);
});

高级频道管理

1. 动态频道系统

javascript
// src/channels.js - 高级频道管理
module.exports = function(app) {
  if(typeof app.channel !== 'function') return;

  // 连接管理
  app.on('connection', connection => {
    app.channel('anonymous').join(connection);
    
    // 存储连接信息
    connection.user = null;
    connection.rooms = [];
    connection.lastActivity = new Date();
  });

  // 登录后的频道分配
  app.on('login', (authResult, { connection }) => {
    const { user } = authResult;

    if(connection) {
      connection.user = user;
      
      // 基础频道
      app.channel('anonymous').leave(connection);
      app.channel('authenticated').join(connection);
      app.channel(`user-\${user._id}`).join(connection);
      
      // 角色频道
      app.channel(`role-\${user.role}`).join(connection);
      
      // 部门频道(如果有)
      if (user.department) {
        app.channel(`department-\${user.department}`).join(connection);
      }
      
      // 项目频道(用户参与的项目)
      if (user.projects && user.projects.length > 0) {
        user.projects.forEach(projectId => {
          app.channel(`project-\${projectId}`).join(connection);
        });
      }
    }
  });

  // 房间管理服务
  app.use('/room-actions', {
    async create(data, params) {
      const { action, roomId, roomType = 'chat' } = data;
      const { connection, user } = params;

      if (!connection || !user) {
        throw new Error('需要认证连接');
      }

      switch (action) {
        case 'join':
          // 加入房间
          app.channel(`\${roomType}-\${roomId}`).join(connection);
          connection.rooms.push(`\${roomType}-\${roomId}`);
          
          // 通知房间内其他用户
          app.channel(`\${roomType}-\${roomId}`).send({
            type: 'user-joined',
            user: user.toSafeObject(),
            roomId,
            roomType,
            timestamp: new Date()
          });
          
          break;

        case 'leave':
          // 离开房间
          app.channel(`\${roomType}-\${roomId}`).leave(connection);
          connection.rooms = connection.rooms.filter(room => room !== `\${roomType}-\${roomId}`);
          
          // 通知房间内其他用户
          app.channel(`\${roomType}-\${roomId}`).send({
            type: 'user-left',
            user: user.toSafeObject(),
            roomId,
            roomType,
            timestamp: new Date()
          });
          
          break;

        default:
          throw new Error('无效的房间操作');
      }

      return { success: true, action, roomId, roomType };
    }
  });

  // 在线状态管理
  app.use('/presence', {
    async find(params) {
      const { user } = params;
      
      if (!user) {
        throw new Error('需要认证');
      }
      
      // 获取在线用户列表
      const onlineUsers = [];
      
      app.channel('authenticated').connections.forEach(connection => {
        if (connection.user) {
          onlineUsers.push({
            id: connection.user._id,
            username: connection.user.username,
            avatar: connection.user.avatar,
            lastActivity: connection.lastActivity,
            status: 'online'
          });
        }
      });
      
      return {
        total: onlineUsers.length,
        data: onlineUsers
      };
    },

    async patch(id, data, params) {
      const { connection, user } = params;
      const { status, activity } = data;
      
      if (!connection || !user) {
        throw new Error('需要认证连接');
      }
      
      // 更新连接状态
      connection.lastActivity = new Date();
      
      // 广播状态变化
      app.channel('authenticated').send({
        type: 'presence-update',
        user: {
          id: user._id,
          username: user.username,
          status: status || 'online',
          activity: activity,
          lastActivity: connection.lastActivity
        }
      });
      
      return { success: true };
    }
  });

  // 服务级别的事件发布
  app.publish((data, hook) => {
    const { service, method, result } = hook;
    
    switch (service) {
      case 'messages':
        return publishMessageEvents(data, hook, app);
      case 'posts':
        return publishPostEvents(data, hook, app);
      case 'notifications':
        return publishNotificationEvents(data, hook, app);
      default:
        return app.channel('authenticated');
    }
  });

  // 消息事件发布策略
  function publishMessageEvents(data, hook, app) {
    const { method, params } = hook;
    
    switch (method) {
      case 'created':
        // 新消息发送给房间内的所有用户
        if (data.roomId) {
          return app.channel(`chat-\${data.roomId}`);
        }
        // 私聊消息
        if (data.recipientId) {
          return [
            app.channel(`user-\${data.senderId}`),
            app.channel(`user-\${data.recipientId}`)
          ];
        }
        break;
        
      case 'patched':
      case 'removed':
        // 消息更新/删除通知相关用户
        if (data.roomId) {
          return app.channel(`chat-\${data.roomId}`);
        }
        break;
    }
    
    return [];
  }

  // 帖子事件发布策略
  function publishPostEvents(data, hook, app) {
    const { method } = hook;
    
    switch (method) {
      case 'created':
        // 新帖子通知所有用户
        return app.channel('authenticated');
        
      case 'patched':
        // 帖子更新只通知作者和管理员
        return [
          app.channel(`user-\${data.authorId}`),
          app.channel('role-admin')
        ];
        
      case 'removed':
        // 帖子删除通知管理员
        return app.channel('role-admin');
    }
    
    return [];
  }

  // 通知事件发布策略
  function publishNotificationEvents(data, hook, app) {
    const { method } = hook;
    
    if (method === 'created') {
      switch (data.type) {
        case 'system':
          // 系统通知发送给所有用户
          return app.channel('authenticated');
          
        case 'personal':
          // 个人通知发送给特定用户
          return app.channel(`user-\${data.userId}`);
          
        case 'role':
          // 角色通知发送给特定角色
          return app.channel(`role-\${data.targetRole}`);
          
        case 'department':
          // 部门通知发送给特定部门
          return app.channel(`department-\${data.departmentId}`);
      }
    }
    
    return [];
  }
};

2. 频道权限控制

javascript
// src/hooks/channel-permissions.js
const checkChannelPermission = (channelType) => {
  return async (context) => {
    const { user, connection } = context.params;
    const { roomId, action } = context.data;
    
    if (!user || !connection) {
      throw new Error('需要认证连接');
    }
    
    switch (channelType) {
      case 'private-chat':
        // 私聊权限检查
        const participants = await context.app.service('chat-participants').find({
          query: { roomId, userId: user._id }
        });
        
        if (participants.total === 0) {
          throw new Error('无权限访问此聊天室');
        }
        break;
        
      case 'project':
        // 项目权限检查
        const projectMember = await context.app.service('project-members').find({
          query: { projectId: roomId, userId: user._id }
        });
        
        if (projectMember.total === 0) {
          throw new Error('无权限访问此项目');
        }
        break;
        
      case 'admin':
        // 管理员权限检查
        if (user.role !== 'admin') {
          throw new Error('需要管理员权限');
        }
        break;
    }
    
    return context;
  };
};

// 使用权限检查
app.service('room-actions').hooks({
  before: {
    create: [
      authenticate('jwt'),
      checkChannelPermission('private-chat')
    ]
  }
});

实时应用实例

1. 实时聊天系统

javascript
// 聊天消息服务
class ChatService {
  constructor(options, app) {
    this.options = options || {};
    this.app = app;
  }

  async create(data, params) {
    const { user } = params;
    const { roomId, content, type = 'text' } = data;
    
    // 验证用户是否在房间中
    const roomMember = await this.app.service('room-members').find({
      query: { roomId, userId: user._id }
    });
    
    if (roomMember.total === 0) {
      throw new Error('您不在此聊天室中');
    }
    
    const message = {
      id: generateId(),
      roomId,
      senderId: user._id,
      content,
      type,
      timestamp: new Date(),
      edited: false,
      reactions: []
    };
    
    // 保存消息
    await this.saveMessage(message);
    
    // 更新房间最后活动时间
    await this.app.service('chat-rooms').patch(roomId, {
      lastActivity: new Date(),
      lastMessage: {
        content: content.substring(0, 100),
        sender: user.username,
        timestamp: message.timestamp
      }
    });
    
    return message;
  }

  async patch(id, data, params) {
    const { user } = params;
    const message = await this.getMessage(id);
    
    // 只有发送者可以编辑消息
    if (message.senderId !== user._id.toString()) {
      throw new Error('只能编辑自己的消息');
    }
    
    // 检查编辑时间限制(比如5分钟内)
    const editTimeLimit = 5 * 60 * 1000;
    if (Date.now() - new Date(message.timestamp).getTime() > editTimeLimit) {
      throw new Error('消息编辑时间已过期');
    }
    
    const updatedMessage = {
      ...message,
      ...data,
      edited: true,
      editedAt: new Date()
    };
    
    await this.saveMessage(updatedMessage);
    
    return updatedMessage;
  }

  // 消息反应(点赞、表情等)
  async addReaction(messageId, reaction, userId) {
    const message = await this.getMessage(messageId);
    
    // 检查是否已经有相同反应
    const existingReaction = message.reactions.find(
      r => r.userId === userId && r.type === reaction.type
    );
    
    if (existingReaction) {
      // 移除反应
      message.reactions = message.reactions.filter(
        r => !(r.userId === userId && r.type === reaction.type)
      );
    } else {
      // 添加反应
      message.reactions.push({
        userId,
        type: reaction.type,
        timestamp: new Date()
      });
    }
    
    await this.saveMessage(message);
    
    // 发送反应事件
    this.app.channel(`chat-\${message.roomId}`).send({
      type: 'message-reaction',
      messageId,
      reaction: {
        userId,
        type: reaction.type,
        action: existingReaction ? 'removed' : 'added'
      }
    });
    
    return message;
  }
}

// 客户端聊天组件
class ChatComponent {
  constructor(roomId) {
    this.roomId = roomId;
    this.messages = [];
    this.setupEventListeners();
  }

  setupEventListeners() {
    const messagesService = client.service('messages');
    
    // 监听新消息
    messagesService.on('created', (message) => {
      if (message.roomId === this.roomId) {
        this.addMessage(message);
        this.scrollToBottom();
        this.playNotificationSound();
      }
    });
    
    // 监听消息编辑
    messagesService.on('patched', (message) => {
      if (message.roomId === this.roomId) {
        this.updateMessage(message);
      }
    });
    
    // 监听消息删除
    messagesService.on('removed', (message) => {
      if (message.roomId === this.roomId) {
        this.removeMessage(message.id);
      }
    });
    
    // 监听自定义事件
    client.on('message-reaction', (data) => {
      if (this.messages.find(m => m.id === data.messageId)) {
        this.updateMessageReaction(data);
      }
    });
    
    // 监听用户状态
    client.on('user-joined', (data) => {
      if (data.roomId === this.roomId) {
        this.showUserJoinedNotification(data.user);
      }
    });
    
    client.on('user-left', (data) => {
      if (data.roomId === this.roomId) {
        this.showUserLeftNotification(data.user);
      }
    });
  }

  async sendMessage(content, type = 'text') {
    try {
      await client.service('messages').create({
        roomId: this.roomId,
        content,
        type
      });
    } catch (error) {
      this.showError('发送消息失败: ' + error.message);
    }
  }

  async editMessage(messageId, newContent) {
    try {
      await client.service('messages').patch(messageId, {
        content: newContent
      });
    } catch (error) {
      this.showError('编辑消息失败: ' + error.message);
    }
  }

  async addReaction(messageId, reactionType) {
    try {
      await client.service('message-reactions').create({
        messageId,
        type: reactionType
      });
    } catch (error) {
      this.showError('添加反应失败: ' + error.message);
    }
  }
}

2. 实时协作编辑

javascript
// 协作文档服务
class CollaborativeDocumentService {
  constructor(options, app) {
    this.options = options || {};
    this.app = app;
    this.documents = new Map(); // 文档状态缓存
  }

  async patch(id, data, params) {
    const { user } = params;
    const { operation, cursor } = data;
    
    // 获取文档当前状态
    let document = this.documents.get(id);
    if (!document) {
      document = await this.loadDocument(id);
      this.documents.set(id, document);
    }
    
    // 验证用户权限
    if (!this.canUserEdit(document, user)) {
      throw new Error('无权限编辑此文档');
    }
    
    // 应用操作
    const result = this.applyOperation(document, operation, user);
    
    // 更新文档状态
    document.content = result.content;
    document.version += 1;
    document.lastModified = new Date();
    document.lastModifiedBy = user._id;
    
    // 保存到数据库
    await this.saveDocument(document);
    
    // 广播变更给其他协作者
    this.app.channel(`document-\${id}`).send({
      type: 'document-operation',
      documentId: id,
      operation: result.operation,
      user: user.toSafeObject(),
      version: document.version,
      cursor: cursor
    });
    
    return {
      success: true,
      version: document.version,
      operation: result.operation
    };
  }

  applyOperation(document, operation, user) {
    switch (operation.type) {
      case 'insert':
        return this.applyInsert(document, operation);
      case 'delete':
        return this.applyDelete(document, operation);
      case 'format':
        return this.applyFormat(document, operation);
      default:
        throw new Error('未知的操作类型');
    }
  }

  applyInsert(document, operation) {
    const { position, text } = operation;
    const content = document.content;
    
    const newContent = content.slice(0, position) + text + content.slice(position);
    
    return {
      content: newContent,
      operation: {
        type: 'insert',
        position,
        text,
        length: text.length
      }
    };
  }

  applyDelete(document, operation) {
    const { position, length } = operation;
    const content = document.content;
    
    const deletedText = content.slice(position, position + length);
    const newContent = content.slice(0, position) + content.slice(position + length);
    
    return {
      content: newContent,
      operation: {
        type: 'delete',
        position,
        length,
        deletedText
      }
    };
  }
}

// 客户端协作编辑器
class CollaborativeEditor {
  constructor(documentId, editorElement) {
    this.documentId = documentId;
    this.editor = editorElement;
    this.version = 0;
    this.pendingOperations = [];
    this.cursors = new Map(); // 其他用户的光标位置
    
    this.setupEventListeners();
    this.loadDocument();
  }

  setupEventListeners() {
    const documentService = client.service('documents');
    
    // 监听文档操作
    client.on('document-operation', (data) => {
      if (data.documentId === this.documentId) {
        this.applyRemoteOperation(data);
        this.updateUserCursor(data.user.id, data.cursor);
      }
    });
    
    // 监听用户加入/离开
    client.on('user-joined', (data) => {
      if (data.roomId === `document-\${this.documentId}`) {
        this.addUserCursor(data.user);
      }
    });
    
    client.on('user-left', (data) => {
      if (data.roomId === `document-\${this.documentId}`) {
        this.removeUserCursor(data.user.id);
      }
    });
    
    // 编辑器事件
    this.editor.addEventListener('input', (event) => {
      this.handleLocalEdit(event);
    });
    
    this.editor.addEventListener('selectionchange', () => {
      this.broadcastCursorPosition();
    });
  }

  async handleLocalEdit(event) {
    const operation = this.createOperationFromEvent(event);
    
    if (operation) {
      // 立即应用到本地
      this.applyLocalOperation(operation);
      
      // 发送到服务器
      try {
        await client.service('documents').patch(this.documentId, {
          operation,
          cursor: this.getCursorPosition()
        });
      } catch (error) {
        console.error('同步操作失败:', error);
        // 回滚本地操作
        this.rollbackOperation(operation);
      }
    }
  }

  applyRemoteOperation(data) {
    const { operation, version, user } = data;
    
    // 检查版本冲突
    if (version !== this.version + 1) {
      console.warn('版本冲突,需要同步');
      this.syncDocument();
      return;
    }
    
    // 应用远程操作
    this.applyOperationToEditor(operation);
    this.version = version;
    
    // 显示操作提示
    this.showOperationIndicator(user, operation);
  }

  broadcastCursorPosition() {
    const position = this.getCursorPosition();
    
    // 节流发送光标位置
    clearTimeout(this.cursorTimeout);
    this.cursorTimeout = setTimeout(() => {
      client.service('document-cursors').patch(this.documentId, {
        position,
        selection: this.getSelection()
      });
    }, 100);
  }
}

性能优化

1. 连接管理优化

javascript
// src/hooks/connection-optimization.js
const connectionLimiter = (maxConnections = 1000) => {
  let connectionCount = 0;
  
  return (app) => {
    app.on('connection', (connection) => {
      connectionCount++;
      
      if (connectionCount > maxConnections) {
        console.warn(`连接数超过限制: \${connectionCount}/\${maxConnections}`);
        connection.disconnect();
        return;
      }
      
      connection.on('disconnect', () => {
        connectionCount--;
      });
    });
  };
};

// 连接清理
const connectionCleaner = (idleTimeout = 30 * 60 * 1000) => {
  return (app) => {
    setInterval(() => {
      const now = Date.now();
      
      app.channel('authenticated').connections.forEach(connection => {
        if (connection.lastActivity && now - connection.lastActivity.getTime() > idleTimeout) {
          console.log(`清理空闲连接: \${connection.id}`);
          connection.disconnect();
        }
      });
    }, 60000); // 每分钟检查一次
  };
};

2. 事件优化

javascript
// 事件批处理
const batchEvents = (batchSize = 10, batchTimeout = 100) => {
  const eventBatches = new Map();
  
  return (data, hook) => {
    const { service, method } = hook;
    const batchKey = `\${service}-\${method}`;
    
    if (!eventBatches.has(batchKey)) {
      eventBatches.set(batchKey, []);
      
      // 设置批处理超时
      setTimeout(() => {
        const batch = eventBatches.get(batchKey);
        if (batch && batch.length > 0) {
          // 发送批量事件
          app.channel('authenticated').send({
            type: 'batch-update',
            service,
            method,
            data: batch
          });
          
          eventBatches.delete(batchKey);
        }
      }, batchTimeout);
    }
    
    const batch = eventBatches.get(batchKey);
    batch.push(data);
    
    // 如果达到批处理大小,立即发送
    if (batch.length >= batchSize) {
      app.channel('authenticated').send({
        type: 'batch-update',
        service,
        method,
        data: batch
      });
      
      eventBatches.delete(batchKey);
    }
    
    // 返回空数组阻止单独事件发送
    return [];
  };
};

总结

通过这篇文章,我们深入学习了 Feathers.js 的实时功能:

实时系统基础

  • 实时通信架构和原理
  • WebSocket/Socket.io 配置
  • 事件系统和频道管理

高级实时功能

  • 动态频道系统
  • 权限控制和安全
  • 自定义事件处理

实战应用

  • 实时聊天系统
  • 协作编辑功能
  • 在线状态管理

性能优化

  • 连接管理优化
  • 事件批处理
  • 资源清理机制

掌握了这些知识,你就能够构建强大的实时应用,包括:

  • 即时通讯系统
  • 协作工具
  • 实时仪表板
  • 在线游戏

下一篇文章,我们将进入第三阶段,学习 Feathers.js 的数据库适配器系统。


相关文章推荐:

有问题欢迎留言讨论,我会及时回复大家!