Feathers.js 实时功能实战 - WebSocket 和事件系统
发布时间:2024-06-03
作者:一介布衣
标签:Feathers.js, WebSocket, 实时通信, 事件系统
前言
上一篇文章我们学习了认证与授权系统,今天咱们来重点学习 Feathers.js 最引以为傲的功能 - 实时通信。说实话,实时功能是 Feathers.js 的杀手锏,它让构建实时应用变得异常简单。
我记得以前用 Socket.io 做实时功能的时候,总是要写很多样板代码:监听连接、处理断线重连、同步数据状态等等。而且前端和后端的事件管理经常搞得一团糟。后来用了 Feathers.js 才发现,原来实时功能可以这么优雅 - 数据一变化,相关客户端自动收到通知,完全不需要手动处理。
今天我就带大家深入学习 Feathers.js 的实时系统,从基础概念到高级应用,让你的应用真正"活"起来。
实时系统架构
Feathers.js 实时通信原理
javascript
// 实时数据流
Service Method Call (create/update/patch/remove)
↓
Service Execution
↓
After Hooks
↓
Event Generation (created/updated/patched/removed)
↓
Channel System (决定发送给谁)
↓
WebSocket/Socket.io (传输层)
↓
Client Event Handlers (前端处理)
核心组件
- Events(事件) - 数据变化时自动生成
- Channels(频道) - 决定事件发送给哪些客户端
- Transports(传输层) - WebSocket、Socket.io 等
- Client(客户端) - 接收和处理实时事件
基础实时配置
1. 服务器端配置
javascript
// src/app.js
const feathers = require('@feathersjs/feathers');
const express = require('@feathersjs/express');
const socketio = require('@feathersjs/socketio');
const app = express(feathers());
// 配置 Socket.io
app.configure(socketio({
cors: {
origin: "http://localhost:3000",
methods: ["GET", "POST"]
},
// Socket.io 配置选项
pingTimeout: 60000,
pingInterval: 25000,
upgradeTimeout: 30000,
maxHttpBufferSize: 1e6,
allowEIO3: true
}));
// 或者配置原生 WebSocket
// app.configure(require('@feathersjs/socketio'));
module.exports = app;
2. 频道系统配置
javascript
// src/channels.js
module.exports = function(app) {
if(typeof app.channel !== 'function') {
// 如果没有实时传输配置,直接返回
return;
}
app.on('connection', connection => {
// 新连接加入匿名频道
app.channel('anonymous').join(connection);
console.log('新连接建立:', connection.id);
});
app.on('disconnect', connection => {
console.log('连接断开:', connection.id);
});
app.on('login', (authResult, { connection }) => {
const { user } = authResult;
if(connection) {
// 认证成功后的频道管理
app.channel('anonymous').leave(connection);
app.channel('authenticated').join(connection);
// 用户专属频道
app.channel(`user-${user._id}`).join(connection);
// 角色频道
app.channel(`role-${user.role}`).join(connection);
console.log(`用户 ${user.username} 已登录`);
}
});
app.on('logout', (authResult, { connection }) => {
if(connection) {
// 登出时移回匿名频道
app.channel('authenticated').leave(connection);
app.channel('anonymous').join(connection);
}
});
// 全局事件发布策略
app.publish((data, hook) => {
// 默认发送给所有认证用户
return app.channel('authenticated');
});
};
3. 客户端配置
javascript
// client/feathers-client.js
import feathers from '@feathersjs/feathers';
import socketio from '@feathersjs/socketio-client';
import auth from '@feathersjs/authentication-client';
import io from 'socket.io-client';
// 建立连接
const socket = io('http://localhost:3030');
const client = feathers();
// 配置 Socket.io 客户端
client.configure(socketio(socket));
// 配置认证
client.configure(auth({
storage: window.localStorage
}));
// 连接事件监听
socket.on('connect', () => {
console.log('已连接到服务器');
});
socket.on('disconnect', () => {
console.log('与服务器断开连接');
});
socket.on('reconnect', () => {
console.log('重新连接成功');
});
export default client;
实时事件系统
1. 自动事件生成
javascript
// Feathers.js 自动生成的事件
const messagesService = app.service('messages');
// 当调用 create 方法时,自动发送 'created' 事件
await messagesService.create({
text: 'Hello World',
userId: user.id
});
// → 触发 'created' 事件
// 当调用 patch 方法时,自动发送 'patched' 事件
await messagesService.patch(messageId, {
text: 'Updated message'
});
// → 触发 'patched' 事件
// 当调用 remove 方法时,自动发送 'removed' 事件
await messagesService.remove(messageId);
// → 触发 'removed' 事件
2. 客户端事件监听
javascript
// client/message-handler.js
import client from './feathers-client';
const messagesService = client.service('messages');
// 监听新消息
messagesService.on('created', (message) => {
console.log('新消息:', message);
addMessageToUI(message);
});
// 监听消息更新
messagesService.on('patched', (message) => {
console.log('消息更新:', message);
updateMessageInUI(message);
});
// 监听消息删除
messagesService.on('removed', (message) => {
console.log('消息删除:', message);
removeMessageFromUI(message);
});
// 监听所有事件
messagesService.on('*', (eventName, data) => {
console.log(`消息事件 ${eventName}:`, data);
});
// 取消监听
const handler = (message) => console.log(message);
messagesService.on('created', handler);
messagesService.off('created', handler); // 取消监听
3. 自定义事件
javascript
// 服务器端发送自定义事件
app.service('messages').create = async function(data, params) {
const message = await this._super(data, params);
// 发送自定义事件
this.emit('message-sent', {
message,
sender: params.user,
timestamp: new Date()
});
return message;
};
// 客户端监听自定义事件
messagesService.on('message-sent', (data) => {
console.log('消息发送事件:', data);
showNotification(`${data.sender.username} 发送了新消息`);
});
高级频道管理
1. 动态频道系统
javascript
// src/channels.js - 高级频道管理
module.exports = function(app) {
if(typeof app.channel !== 'function') return;
// 连接管理
app.on('connection', connection => {
app.channel('anonymous').join(connection);
// 存储连接信息
connection.user = null;
connection.rooms = [];
connection.lastActivity = new Date();
});
// 登录后的频道分配
app.on('login', (authResult, { connection }) => {
const { user } = authResult;
if(connection) {
connection.user = user;
// 基础频道
app.channel('anonymous').leave(connection);
app.channel('authenticated').join(connection);
app.channel(`user-${user._id}`).join(connection);
// 角色频道
app.channel(`role-${user.role}`).join(connection);
// 部门频道(如果有)
if (user.department) {
app.channel(`department-${user.department}`).join(connection);
}
// 项目频道(用户参与的项目)
if (user.projects && user.projects.length > 0) {
user.projects.forEach(projectId => {
app.channel(`project-${projectId}`).join(connection);
});
}
}
});
// 房间管理服务
app.use('/room-actions', {
async create(data, params) {
const { action, roomId, roomType = 'chat' } = data;
const { connection, user } = params;
if (!connection || !user) {
throw new Error('需要认证连接');
}
switch (action) {
case 'join':
// 加入房间
app.channel(`${roomType}-${roomId}`).join(connection);
connection.rooms.push(`${roomType}-${roomId}`);
// 通知房间内其他用户
app.channel(`${roomType}-${roomId}`).send({
type: 'user-joined',
user: user.toSafeObject(),
roomId,
roomType,
timestamp: new Date()
});
break;
case 'leave':
// 离开房间
app.channel(`${roomType}-${roomId}`).leave(connection);
connection.rooms = connection.rooms.filter(room => room !== `${roomType}-${roomId}`);
// 通知房间内其他用户
app.channel(`${roomType}-${roomId}`).send({
type: 'user-left',
user: user.toSafeObject(),
roomId,
roomType,
timestamp: new Date()
});
break;
default:
throw new Error('无效的房间操作');
}
return { success: true, action, roomId, roomType };
}
});
// 在线状态管理
app.use('/presence', {
async find(params) {
const { user } = params;
if (!user) {
throw new Error('需要认证');
}
// 获取在线用户列表
const onlineUsers = [];
app.channel('authenticated').connections.forEach(connection => {
if (connection.user) {
onlineUsers.push({
id: connection.user._id,
username: connection.user.username,
avatar: connection.user.avatar,
lastActivity: connection.lastActivity,
status: 'online'
});
}
});
return {
total: onlineUsers.length,
data: onlineUsers
};
},
async patch(id, data, params) {
const { connection, user } = params;
const { status, activity } = data;
if (!connection || !user) {
throw new Error('需要认证连接');
}
// 更新连接状态
connection.lastActivity = new Date();
// 广播状态变化
app.channel('authenticated').send({
type: 'presence-update',
user: {
id: user._id,
username: user.username,
status: status || 'online',
activity: activity,
lastActivity: connection.lastActivity
}
});
return { success: true };
}
});
// 服务级别的事件发布
app.publish((data, hook) => {
const { service, method, result } = hook;
switch (service) {
case 'messages':
return publishMessageEvents(data, hook, app);
case 'posts':
return publishPostEvents(data, hook, app);
case 'notifications':
return publishNotificationEvents(data, hook, app);
default:
return app.channel('authenticated');
}
});
// 消息事件发布策略
function publishMessageEvents(data, hook, app) {
const { method, params } = hook;
switch (method) {
case 'created':
// 新消息发送给房间内的所有用户
if (data.roomId) {
return app.channel(`chat-${data.roomId}`);
}
// 私聊消息
if (data.recipientId) {
return [
app.channel(`user-${data.senderId}`),
app.channel(`user-${data.recipientId}`)
];
}
break;
case 'patched':
case 'removed':
// 消息更新/删除通知相关用户
if (data.roomId) {
return app.channel(`chat-${data.roomId}`);
}
break;
}
return [];
}
// 帖子事件发布策略
function publishPostEvents(data, hook, app) {
const { method } = hook;
switch (method) {
case 'created':
// 新帖子通知所有用户
return app.channel('authenticated');
case 'patched':
// 帖子更新只通知作者和管理员
return [
app.channel(`user-${data.authorId}`),
app.channel('role-admin')
];
case 'removed':
// 帖子删除通知管理员
return app.channel('role-admin');
}
return [];
}
// 通知事件发布策略
function publishNotificationEvents(data, hook, app) {
const { method } = hook;
if (method === 'created') {
switch (data.type) {
case 'system':
// 系统通知发送给所有用户
return app.channel('authenticated');
case 'personal':
// 个人通知发送给特定用户
return app.channel(`user-${data.userId}`);
case 'role':
// 角色通知发送给特定角色
return app.channel(`role-${data.targetRole}`);
case 'department':
// 部门通知发送给特定部门
return app.channel(`department-${data.departmentId}`);
}
}
return [];
}
};
2. 频道权限控制
javascript
// src/hooks/channel-permissions.js
const checkChannelPermission = (channelType) => {
return async (context) => {
const { user, connection } = context.params;
const { roomId, action } = context.data;
if (!user || !connection) {
throw new Error('需要认证连接');
}
switch (channelType) {
case 'private-chat':
// 私聊权限检查
const participants = await context.app.service('chat-participants').find({
query: { roomId, userId: user._id }
});
if (participants.total === 0) {
throw new Error('无权限访问此聊天室');
}
break;
case 'project':
// 项目权限检查
const projectMember = await context.app.service('project-members').find({
query: { projectId: roomId, userId: user._id }
});
if (projectMember.total === 0) {
throw new Error('无权限访问此项目');
}
break;
case 'admin':
// 管理员权限检查
if (user.role !== 'admin') {
throw new Error('需要管理员权限');
}
break;
}
return context;
};
};
// 使用权限检查
app.service('room-actions').hooks({
before: {
create: [
authenticate('jwt'),
checkChannelPermission('private-chat')
]
}
});
实时应用实例
1. 实时聊天系统
javascript
// 聊天消息服务
class ChatService {
constructor(options, app) {
this.options = options || {};
this.app = app;
}
async create(data, params) {
const { user } = params;
const { roomId, content, type = 'text' } = data;
// 验证用户是否在房间中
const roomMember = await this.app.service('room-members').find({
query: { roomId, userId: user._id }
});
if (roomMember.total === 0) {
throw new Error('您不在此聊天室中');
}
const message = {
id: generateId(),
roomId,
senderId: user._id,
content,
type,
timestamp: new Date(),
edited: false,
reactions: []
};
// 保存消息
await this.saveMessage(message);
// 更新房间最后活动时间
await this.app.service('chat-rooms').patch(roomId, {
lastActivity: new Date(),
lastMessage: {
content: content.substring(0, 100),
sender: user.username,
timestamp: message.timestamp
}
});
return message;
}
async patch(id, data, params) {
const { user } = params;
const message = await this.getMessage(id);
// 只有发送者可以编辑消息
if (message.senderId !== user._id.toString()) {
throw new Error('只能编辑自己的消息');
}
// 检查编辑时间限制(比如5分钟内)
const editTimeLimit = 5 * 60 * 1000;
if (Date.now() - new Date(message.timestamp).getTime() > editTimeLimit) {
throw new Error('消息编辑时间已过期');
}
const updatedMessage = {
...message,
...data,
edited: true,
editedAt: new Date()
};
await this.saveMessage(updatedMessage);
return updatedMessage;
}
// 消息反应(点赞、表情等)
async addReaction(messageId, reaction, userId) {
const message = await this.getMessage(messageId);
// 检查是否已经有相同反应
const existingReaction = message.reactions.find(
r => r.userId === userId && r.type === reaction.type
);
if (existingReaction) {
// 移除反应
message.reactions = message.reactions.filter(
r => !(r.userId === userId && r.type === reaction.type)
);
} else {
// 添加反应
message.reactions.push({
userId,
type: reaction.type,
timestamp: new Date()
});
}
await this.saveMessage(message);
// 发送反应事件
this.app.channel(`chat-${message.roomId}`).send({
type: 'message-reaction',
messageId,
reaction: {
userId,
type: reaction.type,
action: existingReaction ? 'removed' : 'added'
}
});
return message;
}
}
// 客户端聊天组件
class ChatComponent {
constructor(roomId) {
this.roomId = roomId;
this.messages = [];
this.setupEventListeners();
}
setupEventListeners() {
const messagesService = client.service('messages');
// 监听新消息
messagesService.on('created', (message) => {
if (message.roomId === this.roomId) {
this.addMessage(message);
this.scrollToBottom();
this.playNotificationSound();
}
});
// 监听消息编辑
messagesService.on('patched', (message) => {
if (message.roomId === this.roomId) {
this.updateMessage(message);
}
});
// 监听消息删除
messagesService.on('removed', (message) => {
if (message.roomId === this.roomId) {
this.removeMessage(message.id);
}
});
// 监听自定义事件
client.on('message-reaction', (data) => {
if (this.messages.find(m => m.id === data.messageId)) {
this.updateMessageReaction(data);
}
});
// 监听用户状态
client.on('user-joined', (data) => {
if (data.roomId === this.roomId) {
this.showUserJoinedNotification(data.user);
}
});
client.on('user-left', (data) => {
if (data.roomId === this.roomId) {
this.showUserLeftNotification(data.user);
}
});
}
async sendMessage(content, type = 'text') {
try {
await client.service('messages').create({
roomId: this.roomId,
content,
type
});
} catch (error) {
this.showError('发送消息失败: ' + error.message);
}
}
async editMessage(messageId, newContent) {
try {
await client.service('messages').patch(messageId, {
content: newContent
});
} catch (error) {
this.showError('编辑消息失败: ' + error.message);
}
}
async addReaction(messageId, reactionType) {
try {
await client.service('message-reactions').create({
messageId,
type: reactionType
});
} catch (error) {
this.showError('添加反应失败: ' + error.message);
}
}
}
2. 实时协作编辑
javascript
// 协作文档服务
class CollaborativeDocumentService {
constructor(options, app) {
this.options = options || {};
this.app = app;
this.documents = new Map(); // 文档状态缓存
}
async patch(id, data, params) {
const { user } = params;
const { operation, cursor } = data;
// 获取文档当前状态
let document = this.documents.get(id);
if (!document) {
document = await this.loadDocument(id);
this.documents.set(id, document);
}
// 验证用户权限
if (!this.canUserEdit(document, user)) {
throw new Error('无权限编辑此文档');
}
// 应用操作
const result = this.applyOperation(document, operation, user);
// 更新文档状态
document.content = result.content;
document.version += 1;
document.lastModified = new Date();
document.lastModifiedBy = user._id;
// 保存到数据库
await this.saveDocument(document);
// 广播变更给其他协作者
this.app.channel(`document-${id}`).send({
type: 'document-operation',
documentId: id,
operation: result.operation,
user: user.toSafeObject(),
version: document.version,
cursor: cursor
});
return {
success: true,
version: document.version,
operation: result.operation
};
}
applyOperation(document, operation, user) {
switch (operation.type) {
case 'insert':
return this.applyInsert(document, operation);
case 'delete':
return this.applyDelete(document, operation);
case 'format':
return this.applyFormat(document, operation);
default:
throw new Error('未知的操作类型');
}
}
applyInsert(document, operation) {
const { position, text } = operation;
const content = document.content;
const newContent = content.slice(0, position) + text + content.slice(position);
return {
content: newContent,
operation: {
type: 'insert',
position,
text,
length: text.length
}
};
}
applyDelete(document, operation) {
const { position, length } = operation;
const content = document.content;
const deletedText = content.slice(position, position + length);
const newContent = content.slice(0, position) + content.slice(position + length);
return {
content: newContent,
operation: {
type: 'delete',
position,
length,
deletedText
}
};
}
}
// 客户端协作编辑器
class CollaborativeEditor {
constructor(documentId, editorElement) {
this.documentId = documentId;
this.editor = editorElement;
this.version = 0;
this.pendingOperations = [];
this.cursors = new Map(); // 其他用户的光标位置
this.setupEventListeners();
this.loadDocument();
}
setupEventListeners() {
const documentService = client.service('documents');
// 监听文档操作
client.on('document-operation', (data) => {
if (data.documentId === this.documentId) {
this.applyRemoteOperation(data);
this.updateUserCursor(data.user.id, data.cursor);
}
});
// 监听用户加入/离开
client.on('user-joined', (data) => {
if (data.roomId === `document-${this.documentId}`) {
this.addUserCursor(data.user);
}
});
client.on('user-left', (data) => {
if (data.roomId === `document-${this.documentId}`) {
this.removeUserCursor(data.user.id);
}
});
// 编辑器事件
this.editor.addEventListener('input', (event) => {
this.handleLocalEdit(event);
});
this.editor.addEventListener('selectionchange', () => {
this.broadcastCursorPosition();
});
}
async handleLocalEdit(event) {
const operation = this.createOperationFromEvent(event);
if (operation) {
// 立即应用到本地
this.applyLocalOperation(operation);
// 发送到服务器
try {
await client.service('documents').patch(this.documentId, {
operation,
cursor: this.getCursorPosition()
});
} catch (error) {
console.error('同步操作失败:', error);
// 回滚本地操作
this.rollbackOperation(operation);
}
}
}
applyRemoteOperation(data) {
const { operation, version, user } = data;
// 检查版本冲突
if (version !== this.version + 1) {
console.warn('版本冲突,需要同步');
this.syncDocument();
return;
}
// 应用远程操作
this.applyOperationToEditor(operation);
this.version = version;
// 显示操作提示
this.showOperationIndicator(user, operation);
}
broadcastCursorPosition() {
const position = this.getCursorPosition();
// 节流发送光标位置
clearTimeout(this.cursorTimeout);
this.cursorTimeout = setTimeout(() => {
client.service('document-cursors').patch(this.documentId, {
position,
selection: this.getSelection()
});
}, 100);
}
}
性能优化
1. 连接管理优化
javascript
// src/hooks/connection-optimization.js
const connectionLimiter = (maxConnections = 1000) => {
let connectionCount = 0;
return (app) => {
app.on('connection', (connection) => {
connectionCount++;
if (connectionCount > maxConnections) {
console.warn(`连接数超过限制: ${connectionCount}/${maxConnections}`);
connection.disconnect();
return;
}
connection.on('disconnect', () => {
connectionCount--;
});
});
};
};
// 连接清理
const connectionCleaner = (idleTimeout = 30 * 60 * 1000) => {
return (app) => {
setInterval(() => {
const now = Date.now();
app.channel('authenticated').connections.forEach(connection => {
if (connection.lastActivity && now - connection.lastActivity.getTime() > idleTimeout) {
console.log(`清理空闲连接: ${connection.id}`);
connection.disconnect();
}
});
}, 60000); // 每分钟检查一次
};
};
2. 事件优化
javascript
// 事件批处理
const batchEvents = (batchSize = 10, batchTimeout = 100) => {
const eventBatches = new Map();
return (data, hook) => {
const { service, method } = hook;
const batchKey = `${service}-${method}`;
if (!eventBatches.has(batchKey)) {
eventBatches.set(batchKey, []);
// 设置批处理超时
setTimeout(() => {
const batch = eventBatches.get(batchKey);
if (batch && batch.length > 0) {
// 发送批量事件
app.channel('authenticated').send({
type: 'batch-update',
service,
method,
data: batch
});
eventBatches.delete(batchKey);
}
}, batchTimeout);
}
const batch = eventBatches.get(batchKey);
batch.push(data);
// 如果达到批处理大小,立即发送
if (batch.length >= batchSize) {
app.channel('authenticated').send({
type: 'batch-update',
service,
method,
data: batch
});
eventBatches.delete(batchKey);
}
// 返回空数组阻止单独事件发送
return [];
};
};
总结
通过这篇文章,我们深入学习了 Feathers.js 的实时功能:
✅ 实时系统基础:
- 实时通信架构和原理
- WebSocket/Socket.io 配置
- 事件系统和频道管理
✅ 高级实时功能:
- 动态频道系统
- 权限控制和安全
- 自定义事件处理
✅ 实战应用:
- 实时聊天系统
- 协作编辑功能
- 在线状态管理
✅ 性能优化:
- 连接管理优化
- 事件批处理
- 资源清理机制
掌握了这些知识,你就能够构建强大的实时应用,包括:
- 即时通讯系统
- 协作工具
- 实时仪表板
- 在线游戏
下一篇文章,我们将进入第三阶段,学习 Feathers.js 的数据库适配器系统。
相关文章推荐:
有问题欢迎留言讨论,我会及时回复大家!