当前位置:   article > 正文

Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题(二)_sprongboot 整合 stripte

sprongboot 整合 stripte

前言

上一篇,简单给大家整合了一下websocket,使用stomp方式。

这篇,就是考虑到单体的服务使用websocket ,按照上一篇的整合,确实没问题。

但是如果一旦是负载多台服务的时候,那么就会出现丢失问题。

什么?没有想过这个问题? 

没关系,看图学东西:

一贯作风,我瞎画了一张简图,大致讲一下前后端使用websocket通讯的场面。


 简析:

 后端某个服务起了,整合了websocket作为 server,开放了一些节点endpoints ;

 前端服务也起了,也整合了websocket作为 client,连接server的websocket ;

 后端server 将每个 前端client 连接的 websocket session 都存起来, 确保 知道谁是谁。

 这样在server给client推送消息的时候,能保证推送,数据不丢失。

websocket session map  是存在于 后端服务 的内存里面的 ,单一台后端 server,貌似没啥大问题。

ok,我们简单也了解了一下大致的场面,接下来继续看图 ,多台websocket server 负载场景:

 简析:

现在作为同一个后端微服务,整合websocket,起了两台。

如上图所示,如果 红色的client 跟 websocket打交道的时候,连接的是上面的 浅蓝色websocket server;

浅蓝色websocket server 推送消息给  红色的client ,通讯没问题。

因为websocket server 的本地服务session map里面存放着 红色client的连接websocket session;

消息丢失情况出现:

我们负载了两台websocket server ,如果触发 websocket server 给红色client继续推送通知消息, nginx/网关 根据我们往常的负载均衡配置规则,分发到了 绿色的 websocket server。

此时,绿色的 websocket server 的本地服务session map里面 并没有  红色client的连接websocket session ,所以会导致  通知消息 丢失 。

解决方案:

既然问题出现了,那么我们就解决它,本篇就是介绍怎么通过 整合消息中间件去解决这个消息丢失的问题。

我们采取的方案 是 将整合websocket 的 server服务 (多台) 都整合 redis作为消息中间件;

在websocket  server 推送消息给websocket  client时, 先把消息丢到 redis里面 。

然后所有的websocket  server (不管多少台服务) 都会订阅此主题,获取到需要推送的数据,接下来 再推送给到对应的 destination 节点 (这时候只有真正与当前client有连接关系的 server扶服务才能推送成功,其余都没有推送)。

做个简图:

也许很多人看到这心里面多多少少有些疑惑, 其实根本原因就是 多台 server 没办法共享 连接session,如果能把session 保持起来共享,岂不是解决了?

是的,思路是对的,可惜的是 websocekt session 是没有实现序列化接口的,无法使用类似redis去存储起来,然后反序列化获取。(但是其实可以通过redis存储相关websocket sessionkey 与节点的IP地址、端口,强行把请求再次分发到正确的websocket server上面去。但是个人感觉这种方式不是很好,所以本文还是介绍redis的订阅/推送方式来解决这个问题)

话不多说,进入代码环节。

正文

基于上一篇的基础,开始魔改把redis加入进来。

 贴代码:


pom.xml :

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>fastjson</artifactId>
  9. <version>1.2.68</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-websocket</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-test</artifactId>
  22. <scope>test</scope>
  23. </dependency>
  24. </dependencies>

application.yml :

  1. server:
  2. port: 9908
  3. spring:
  4. redis:
  5. host: 127.0.0.1
  6. port: 6379
  7. password: 123456

RedisConfig.java :
 

  1. import com.fasterxml.jackson.annotation.JsonAutoDetect;
  2. import com.fasterxml.jackson.annotation.PropertyAccessor;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.stomp.stomptest.listener.RedisListener;
  5. import org.springframework.cache.annotation.CachingConfigurerSupport;
  6. import org.springframework.cache.annotation.EnableCaching;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.context.annotation.Primary;
  10. import org.springframework.data.redis.connection.RedisConnectionFactory;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.data.redis.listener.PatternTopic;
  13. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  14. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
  15. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
  16. import org.springframework.data.redis.serializer.StringRedisSerializer;
  17. /**
  18. * @Author JCccc
  19. * @Description redis配置
  20. * @Date 2021/6/30 8:53
  21. */
  22. @Configuration
  23. @EnableCaching
  24. public class RedisConfig extends CachingConfigurerSupport {
  25. @Bean
  26. @Primary
  27. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
  28. RedisTemplate<String, Object> template = new RedisTemplate<>();
  29. template.setConnectionFactory(factory);
  30. Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
  31. StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
  32. ObjectMapper om = new ObjectMapper();
  33. om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  34. om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  35. jacksonSeial.setObjectMapper(om);
  36. template.setValueSerializer(jacksonSeial);
  37. template.setKeySerializer(stringRedisSerializer);
  38. template.setHashKeySerializer(stringRedisSerializer);
  39. template.setHashValueSerializer(jacksonSeial);
  40. template.afterPropertiesSet();
  41. return template;
  42. }
  43. @Bean
  44. RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  45. MessageListenerAdapter topicAdapter) {
  46. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  47. container.setConnectionFactory(connectionFactory);
  48. //订阅了主题 webSocketMsgPush
  49. container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush"));
  50. return container;
  51. }
  52. /**
  53. * 消息监听器适配器,绑定消息处理器
  54. *
  55. * @return
  56. */
  57. @Bean
  58. MessageListenerAdapter topicAdapter() {
  59. return new MessageListenerAdapter(new RedisListener());
  60. }
  61. }

简析:

 WebSocketConfig.java :

  1. import org.springframework.context.annotation.Configuration;
  2. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  3. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  5. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  6. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  7. import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
  8. /**
  9. * @Author JCccc
  10. * @Description EnableWebSocketMessageBroker-注解开启STOMP协议来传输基于代理的消息,此时控制器支持使用@MessageMapping
  11. * @Date 2021/6/30 8:53
  12. */
  13. @Configuration
  14. @EnableWebSocketMessageBroker
  15. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  16. private static long HEART_BEAT = 10000;
  17. @Override
  18. public void configureMessageBroker(MessageBrokerRegistry config) {
  19. ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
  20. te.setPoolSize(1);
  21. te.setThreadNamePrefix("wss-heartbeat-thread-");
  22. te.initialize();
  23. config.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT, HEART_BEAT}).setTaskScheduler(te);
  24. }
  25. /**
  26. * 开放节点
  27. * @param registry
  28. */
  29. @Override
  30. public void registerStompEndpoints(StompEndpointRegistry registry) {
  31. //注册两个STOMP的endpoint,分别用于广播和点对点
  32. //广播
  33. registry.addEndpoint("/publicServer").withSockJS();
  34. //点对点
  35. registry.addEndpoint("/privateServer").withSockJS();
  36. }
  37. @Override
  38. public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  39. registration.setMessageSizeLimit(500 * 1024 * 1024);
  40. registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
  41. registration.setSendTimeLimit(200000);
  42. }
  43. }

InjectServiceUtil.java :

  1. import com.stomp.stomptest.producer.PushMessage;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.PostConstruct;
  5. /**
  6. * @Author JCccc
  7. * @Description pushMessage (单例)
  8. * @Date 2021/6/30 8:53
  9. */
  10. @Component
  11. public class InjectServiceUtil {
  12. @Autowired
  13. private PushMessage pushMessage;
  14. @PostConstruct
  15. public void init(){
  16. InjectServiceUtil.getInstance().pushMessage = this.pushMessage;
  17. }
  18. /**
  19. * 实现单例 start
  20. */
  21. private static class SingletonHolder {
  22. private static final InjectServiceUtil INSTANCE = new InjectServiceUtil();
  23. }
  24. private InjectServiceUtil (){}
  25. public static final InjectServiceUtil getInstance() {
  26. return SingletonHolder.INSTANCE;
  27. }
  28. /**
  29. * 实现单例 end
  30. */
  31. public PushMessage pushMessage(){
  32. return InjectServiceUtil.getInstance().pushMessage;
  33. }
  34. }

RedisListener.java :

  1. import org.springframework.data.redis.connection.Message;
  2. import org.springframework.data.redis.connection.MessageListener;
  3. /**
  4. * @Author JCccc
  5. * @Description redis监听消息
  6. * @Date 2021/6/30 8:53
  7. */
  8. public class RedisListener implements MessageListener {
  9. @Override
  10. public void onMessage(Message message, byte[] bytes) {
  11. System.out.println("步骤1.监听到需要进行负载转发的消息:" + message.toString());
  12. InjectServiceUtil.getInstance().pushMessage().send(message.toString());
  13. }
  14. }

简析:

Message.java :

  1. /**
  2. * @Author JCccc
  3. * @Description
  4. * @Date 2021/8/20 9:26
  5. */
  6. public class Message {
  7. /**
  8. * 消息编码
  9. */
  10. private String code;
  11. /**
  12. * 来自(保证唯一)
  13. */
  14. private String form;
  15. /**
  16. * 去自(保证唯一)
  17. */
  18. private String to;
  19. /**
  20. * 内容
  21. */
  22. private String content;
  23. public String getCode() {
  24. return code;
  25. }
  26. public void setCode(String code) {
  27. this.code = code;
  28. }
  29. public String getForm() {
  30. return form;
  31. }
  32. public void setForm(String form) {
  33. this.form = form;
  34. }
  35. public String getTo() {
  36. return to;
  37. }
  38. public void setTo(String to) {
  39. this.to = to;
  40. }
  41. public String getContent() {
  42. return content;
  43. }
  44. public void setContent(String content) {
  45. this.content = content;
  46. }
  47. }

PushMessage.java  :

  1. import com.alibaba.fastjson.JSON;
  2. import com.stomp.stomptest.pojo.Message;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.messaging.simp.SimpMessagingTemplate;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @Author JCccc
  8. * @Description 消息发送
  9. * @Date 2021/6/30 8:53
  10. */
  11. @Service
  12. public class PushMessage {
  13. @Autowired
  14. private SimpMessagingTemplate template;
  15. public void send(String msgJson){
  16. System.out.println("步骤2.即将推送给websocket server:");
  17. Message message = JSON.parseObject(msgJson, Message.class);
  18. System.out.println("步骤2.1 消息发给:"+message.getTo());
  19. System.out.println("步骤2.2 发送内容是:"+message.getContent());
  20. template.convertAndSendToUser(message.getTo(), "/message", message.getContent());
  21. System.out.println("----------消息发送完毕----------");
  22. //广播
  23. // template.convertAndSend("/topic/public",chatMessage);
  24. }
  25. }

最后是 TestController.java (测试接口。模拟触发系统推送消息):

  1. @Controller
  2. public class TestController {
  3. @Autowired
  4. public SimpMessagingTemplate template;
  5. @ResponseBody
  6. @RequestMapping("/pushToOneTest")
  7. public String sendMessage(@RequestBody Message message) {
  8. String messageJson = JSON.toJSONString(message);
  9. System.out.println("!!!系统准备做消息推送!!!");
  10. stringRedisTemplate.convertAndSend("webSocketMsgPush",messageJson);
  11. return "发送成功";
  12. }
  13. }

ok,项目跑起来,调用一下测试接口,看看整个流程:

可以看到消息正常推送:

看下代码里面打印的信息(大家初学的可以之间打debug看下流程会更好):

可以看到,这种场景,不管负载了多少台, 消息都先到 redis里面去。

每一台server都通过redisListener 监听主题,获取到相关 消息 , 然后才开始使用本地的websocket session 去找到各自服务器内是否存在当前用户的连接数据,然后推送出去。

那些找不到的无法进行推送出去,虽然黯然失色罢了,但是 我们的负载问题解决了。

恭喜我们自己。

ok,该篇就到这。

最后还有就是之前的一篇,使用rabbitmq作为消息代理的:

《Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程》

https://blog.csdn.net/qq_35387940/article/details/108276136

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/817870
推荐阅读
相关标签
  

闽ICP备14008679号