赞
踩
目录
3.2.方案2:采用mongo生命周期的AbstractMongoEventListener
4.基于change stream方式实现分布式下websocket的消息推送
5.mongodb的单机模式下也可采用AbstractMongoEventListener方式
就一句话:在多个springboot进程和多节点Mongo副本集集群的基础上,解决websocket的session共享问题。
1.多节点的Mongo副本集集群
2.单一的springboot服务的多个副本,也就是多个相同的springboot进程
3.要求实时性高,拒绝定时器(主要是有窗口期,消耗的资源伤不起),无论是前端定时还是后端定时。
方案不可行
原因:
Session无法采用Redis进行存储, 因为不能对Session进行序列化
那为什么Session无法进行序列化呢?
因为其中包含了一些不可序列化的对象,比如底层网络连接和线程相关的信息。这些信息无法被简单地序列化和反序列化,因此直接对 WebSocket session 进行序列化会导致一些问题,比如无法正确地恢复网络连接状态和线程状态。
方案不可行
原因:
AbstractMongoEventListener内部采用的是利用MongoDB的事件通知机制,在文档操作时触发相应的事件,然后监听器将收到事件通知并执行预定义的操作。
本质还是在Spring应用程序中注册监听器,从而实现对MongoDB文档生命周期事件的监听和处理,是基于单线程的,在多个springboot线程下,不能实现数据共享。
原理:
多进程同时监听消费MQ,当有消息需要推送的时候,发送MQ广播消息,每个springboot进程都能收到广播消息,然后检查进程缓存中的websocket session连接信息,推送消息。
方案可行
缺点:
1.需要引入第三方组件,增加代码复杂性
2.高可用环境下,增加运维难度,需要维护搭建MQ集群
MongoDB 从 3.6 版本开始提供订阅数据变更的功能,但仅限于mongo集群,包括mongo副本集模式和mongo分片模式
原理:
基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数
方案可行
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-mongodb</artifactId>
- </dependency>
- import com.mongodb.client.model.changestream.FullDocument;
- import com.xxx.listener.AlarmMessageListener;
- import com.xxx.util.AESUtil;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.mongodb.MongoDatabaseFactory;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
- import org.springframework.data.mongodb.core.aggregation.Aggregation;
- import org.springframework.data.mongodb.core.mapping.Document;
- import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
- import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
- import org.springframework.data.mongodb.core.messaging.MessageListener;
- import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
- import org.springframework.data.mongodb.core.query.Criteria;
-
- import java.util.concurrent.Executor;
- import java.util.concurrent.Executors;
- @Configuration
- public class MongoDBConfig {
-
- @Bean
- public MongoTemplate mongoTemplate() throws Exception {
- return new MongoTemplate(mongoDatabaseFactory());
- }
-
- @Autowired
- public ApplicationEventPublisher eventPublisher;
-
- @Bean
- @ConditionalOnProperty(name = "db.is-stand-alone", havingValue = "false")
- public MessageListenerContainer alarmMessageListenerContainer(MongoTemplate mongoTemplate) {
- AlarmMessageListener messageListener = new AlarmMessageListener(mongoTemplate, eventPublisher);
- return customMessageListenerContainer("alarm", mongoTemplate, messageListener);
- }
-
- private MessageListenerContainer customMessageListenerContainer(String collectionName,MongoTemplate template, MessageListener messageListener) {
-
- MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, Executors.newFixedThreadPool(5)) {
- @Override
- public boolean isAutoStartup() {
- return true;
- }
- };
- ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(messageListener)
- .collection(collectionName) // 需要监听的集合名
- // 过滤需要监听的操作类型,可以根据需求指定过滤条件
- .filter(Aggregation.newAggregation(Aggregation.match(
- Criteria.where("operationType").in("insert", "update"))))
- // 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
- .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
- .build();
- messageListenerContainer.register(request, Document.class);
- return messageListenerContainer;
- }
- }
- import com.mongodb.client.model.changestream.ChangeStreamDocument;
- import com.xxx.alarm.model.Alarm;
- import com.xxx.notification.event.AlarmEventPublisher;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.StringUtils;
- import org.bson.Document;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.data.mongodb.core.messaging.Message;
- import org.springframework.data.mongodb.core.messaging.MessageListener;
-
- @Slf4j
- public class AlarmMessageListener implements MessageListener<ChangeStreamDocument<Document>, Object> {
-
- private MongoTemplate mongoTemplate;
- private ApplicationEventPublisher eventPublisher;
-
- public AlarmMessageListener(MongoTemplate mongoTemplate, ApplicationEventPublisher eventPublisher) {
- this.mongoTemplate = mongoTemplate;
- this.eventPublisher = eventPublisher;
- }
-
- @Override
- public void onMessage(Message<ChangeStreamDocument<Document>, Object> message) {
- ChangeStreamDocument<Document> changeStreamDocument = message.getRaw();
- log.info("changestream操作为 :" + changeStreamDocument);
- Document document = changeStreamDocument.getFullDocument();
- Alarm alarm = mongoTemplate.getConverter().read(Alarm.class, document);
- if(alarm == null) return;
- //实时消息推送
- AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
- }
- }
增加自定义listener监听器即可
- import com.xxx.alarm.model.Alarm;
- import com.xxx.notification.event.AlarmEventPublisher;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.context.ApplicationEventPublisher;
- import org.springframework.data.mongodb.core.mapping.event.AbstractMongoEventListener;
- import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
-
- /**
- * 告警监听
- */
- @Slf4j
- @Component
- public class AlarmMongoDataEventListener extends AbstractMongoEventListener<Alarm> {
-
- private ApplicationEventPublisher eventPublisher;
-
- public AlarmMongoDataEventListener(ApplicationEventPublisher eventPublisher) {
- this.eventPublisher = eventPublisher;
- }
-
-
- @Override
- public void onBeforeConvert(BeforeConvertEvent<Alarm> alarmEvent) {
- Alarm alarm =alarmEvent.getSource();
- if(alarm == null) return;
- String notificationType = alarm.getNotificationType();
- if(StringUtils.isBlank(notificationType)) return;
- log.info("Before Convert alarm Event: " + alarm);
- //实时消息推送
- AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
- }
- }
mongodb生命周期
mongodb集群的change streams
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。