当前位置:   article > 正文

分布式websocket实时通讯的session共享问题

分布式websocket实时通讯的session共享问题

目录

1.需求

2.前置条件和要求

3.方案分析

3.1.方案1:session共享存储到redis数据库

3.2.方案2:采用mongo生命周期的AbstractMongoEventListener

3.3.方案3:引入redis等MQ组件,发送广播消息

3.4.方案4:采用change stream方式同步数据

4.基于change stream方式实现分布式下websocket的消息推送

4.1.pom.xml引入mongodb启动器

4.2.配置类增加自定义bean

4.3.实现监听中的回调函数

5.mongodb的单机模式下也可采用AbstractMongoEventListener方式

6.文档推荐


1.需求

就一句话:在多个springboot进程和多节点Mongo副本集集群的基础上,解决websocket的session共享问题。

2.前置条件和要求

1.多节点的Mongo副本集集群

2.单一的springboot服务的多个副本,也就是多个相同的springboot进程

3.要求实时性高,拒绝定时器(主要是有窗口期,消耗的资源伤不起),无论是前端定时还是后端定时。

3.方案分析

3.1.方案1:session共享存储到redis数据库

方案不可行

原因:

Session无法采用Redis进行存储, 因为不能对Session进行序列化

那为什么Session无法进行序列化呢?

因为其中包含了一些不可序列化的对象,比如底层网络连接和线程相关的信息。这些信息无法被简单地序列化和反序列化,因此直接对 WebSocket session 进行序列化会导致一些问题,比如无法正确地恢复网络连接状态和线程状态。

3.2.方案2:采用mongo生命周期的AbstractMongoEventListener

方案不可行

原因:

AbstractMongoEventListener内部采用的是利用MongoDB的事件通知机制,在文档操作时触发相应的事件,然后监听器将收到事件通知并执行预定义的操作。

本质还是在Spring应用程序中注册监听器,从而实现对MongoDB文档生命周期事件的监听和处理,是基于单线程的,在多个springboot线程下,不能实现数据共享。

3.3.方案3:引入redis等MQ组件,发送广播消息

原理:

多进程同时监听消费MQ,当有消息需要推送的时候,发送MQ广播消息,每个springboot进程都能收到广播消息,然后检查进程缓存中的websocket session连接信息,推送消息。

方案可行

缺点:

1.需要引入第三方组件,增加代码复杂性

2.高可用环境下,增加运维难度,需要维护搭建MQ集群

3.4.方案4:采用change stream方式同步数据

MongoDB 从 3.6 版本开始提供订阅数据变更的功能,但仅限于mongo集群,包括mongo副本集模式和mongo分片模式

原理:

基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数

方案可行

4.基于change stream方式实现分布式下websocket的消息推送

4.1.pom.xml引入mongodb启动

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-mongodb</artifactId>
  4. </dependency>

4.2.配置类增加自定义bean

  1. import com.mongodb.client.model.changestream.FullDocument;
  2. import com.xxx.listener.AlarmMessageListener;
  3. import com.xxx.util.AESUtil;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.ApplicationEventPublisher;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.data.mongodb.MongoDatabaseFactory;
  10. import org.springframework.data.mongodb.core.MongoTemplate;
  11. import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
  12. import org.springframework.data.mongodb.core.aggregation.Aggregation;
  13. import org.springframework.data.mongodb.core.mapping.Document;
  14. import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
  15. import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
  16. import org.springframework.data.mongodb.core.messaging.MessageListener;
  17. import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
  18. import org.springframework.data.mongodb.core.query.Criteria;
  19. import java.util.concurrent.Executor;
  20. import java.util.concurrent.Executors;
  21. @Configuration
  22. public class MongoDBConfig {
  23. @Bean
  24. public MongoTemplate mongoTemplate() throws Exception {
  25. return new MongoTemplate(mongoDatabaseFactory());
  26. }
  27. @Autowired
  28. public ApplicationEventPublisher eventPublisher;
  29. @Bean
  30. @ConditionalOnProperty(name = "db.is-stand-alone", havingValue = "false")
  31. public MessageListenerContainer alarmMessageListenerContainer(MongoTemplate mongoTemplate) {
  32. AlarmMessageListener messageListener = new AlarmMessageListener(mongoTemplate, eventPublisher);
  33. return customMessageListenerContainer("alarm", mongoTemplate, messageListener);
  34. }
  35. private MessageListenerContainer customMessageListenerContainer(String collectionName,MongoTemplate template, MessageListener messageListener) {
  36. MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, Executors.newFixedThreadPool(5)) {
  37. @Override
  38. public boolean isAutoStartup() {
  39. return true;
  40. }
  41. };
  42. ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(messageListener)
  43. .collection(collectionName) // 需要监听的集合名
  44. // 过滤需要监听的操作类型,可以根据需求指定过滤条件
  45. .filter(Aggregation.newAggregation(Aggregation.match(
  46. Criteria.where("operationType").in("insert", "update"))))
  47. // 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
  48. .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
  49. .build();
  50. messageListenerContainer.register(request, Document.class);
  51. return messageListenerContainer;
  52. }
  53. }

4.3.实现监听中的回调函数

  1. import com.mongodb.client.model.changestream.ChangeStreamDocument;
  2. import com.xxx.alarm.model.Alarm;
  3. import com.xxx.notification.event.AlarmEventPublisher;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.bson.Document;
  7. import org.springframework.context.ApplicationEventPublisher;
  8. import org.springframework.data.mongodb.core.MongoTemplate;
  9. import org.springframework.data.mongodb.core.messaging.Message;
  10. import org.springframework.data.mongodb.core.messaging.MessageListener;
  11. @Slf4j
  12. public class AlarmMessageListener implements MessageListener<ChangeStreamDocument<Document>, Object> {
  13. private MongoTemplate mongoTemplate;
  14. private ApplicationEventPublisher eventPublisher;
  15. public AlarmMessageListener(MongoTemplate mongoTemplate, ApplicationEventPublisher eventPublisher) {
  16. this.mongoTemplate = mongoTemplate;
  17. this.eventPublisher = eventPublisher;
  18. }
  19. @Override
  20. public void onMessage(Message<ChangeStreamDocument<Document>, Object> message) {
  21. ChangeStreamDocument<Document> changeStreamDocument = message.getRaw();
  22. log.info("changestream操作为 :" + changeStreamDocument);
  23. Document document = changeStreamDocument.getFullDocument();
  24. Alarm alarm = mongoTemplate.getConverter().read(Alarm.class, document);
  25. if(alarm == null) return;
  26. //实时消息推送
  27. AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
  28. }
  29. }

5.mongodb的单机模式下也可采用AbstractMongoEventListener方式

增加自定义listener监听器即可

  1. import com.xxx.alarm.model.Alarm;
  2. import com.xxx.notification.event.AlarmEventPublisher;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.springframework.context.ApplicationEventPublisher;
  6. import org.springframework.data.mongodb.core.mapping.event.AbstractMongoEventListener;
  7. import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
  8. /**
  9. * 告警监听
  10. */
  11. @Slf4j
  12. @Component
  13. public class AlarmMongoDataEventListener extends AbstractMongoEventListener<Alarm> {
  14. private ApplicationEventPublisher eventPublisher;
  15. public AlarmMongoDataEventListener(ApplicationEventPublisher eventPublisher) {
  16. this.eventPublisher = eventPublisher;
  17. }
  18. @Override
  19. public void onBeforeConvert(BeforeConvertEvent<Alarm> alarmEvent) {
  20. Alarm alarm =alarmEvent.getSource();
  21. if(alarm == null) return;
  22. String notificationType = alarm.getNotificationType();
  23. if(StringUtils.isBlank(notificationType)) return;
  24. log.info("Before Convert alarm Event: " + alarm);
  25. //实时消息推送
  26. AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
  27. }
  28. }

6.文档推荐

mongodb生命周期

AbstractMongoEventListener (Spring Data MongoDB 4.2.5 API)

Lifecycle Events :: Spring Data MongoDB

mongodb集群的change streams

Change Streams - MongoDB Manual v7.0

ChangeStreams 使用及原理(二)|学习笔记-阿里云开发者社区

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/593326
推荐阅读
相关标签
  

闽ICP备14008679号