当前位置:   article > 正文

WebSocket+Redis实现集群消息_websocket集群redis

websocket集群redis

1、在pom.xml中引入相关依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-data-redis</artifactId>
  8. </dependency>

2、创建WebSocketServer用于处理websocket请求

  1. package com.hdy.manage.web.socket;
  2. import com.alibaba.fastjson.JSON;
  3. import com.hdy.manage.configuration.WebSocketConfiguration;
  4. import com.hdy.manage.data.base.JsonResult;
  5. import com.hdy.manage.exception.WebsocketUnauthorizedException;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import javax.websocket.*;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.io.IOException;
  12. import java.util.concurrent.CopyOnWriteArraySet;
  13. /**
  14. * WebSocket请求
  15. */
  16. @Component
  17. @ServerEndpoint(value = "/websocket", configurator = WebSocketConfiguration.class)
  18. @Slf4j
  19. public class WebSocketServer {
  20. //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
  21. private static CopyOnWriteArraySet<WebSocketServer> webSocketServers = new CopyOnWriteArraySet<>();
  22. //与某个客户端的连接会话,需要通过它来给客户端发送数据
  23. private Session session;
  24. // 用户id
  25. private Integer userId;
  26. // 用户token:用于挤出登录通知,不能使用userId
  27. private String token;
  28. /**
  29. * 连接建立成功调用的方法
  30. */
  31. @OnOpen
  32. public void onOpen(Session session) {
  33. Object userIdObj = session.getUserProperties().get("userId");
  34. Object tokenObj = session.getUserProperties().get("token");
  35. if (null == userIdObj || null == tokenObj) {
  36. // 未进行认证,获取不到用户信息
  37. log.info("用户未登录,主动报错关闭连接");
  38. sendMessage(session, JSON.toJSONString(JsonResult.unauthorized()));
  39. throw new WebsocketUnauthorizedException("用户未登录");
  40. } else {
  41. Integer userId = (Integer) userIdObj;
  42. String token = (String) tokenObj;
  43. this.session = session;
  44. this.userId = userId;
  45. this.token = token;
  46. webSocketServers.add(this); //加入set
  47. log.info("用户 [" + userId + "] 加入websocket,当前在线人数为:" + onlineCount());
  48. }
  49. }
  50. /**
  51. * 连接关闭调用的方法
  52. */
  53. @OnClose
  54. public void onClose() {
  55. webSocketServers.remove(this);//set中删除
  56. log.info("用户 [" + this.userId + "] 关闭websocket,当前在线人数为:" + onlineCount());
  57. }
  58. /**
  59. * 收到客户端消息后调用的方法
  60. */
  61. @OnMessage
  62. public void onMessage(String message, Session session) {
  63. log.info(this.userId + "发送消息: " + message);
  64. System.out.println(message);
  65. }
  66. /**
  67. * 发生错误时调用
  68. */
  69. @OnError
  70. public void onError(Session session, Throwable error) {
  71. error.printStackTrace();
  72. log.info("WebSocket连接发生错误,连接已被断开...");
  73. }
  74. /**
  75. * 发送消息
  76. */
  77. public void sendMessage(String message) throws IOException {
  78. this.session.getBasicRemote().sendText(message);
  79. //this.session.getAsyncRemote().sendText(message);
  80. }
  81. /**
  82. * 根据Session进行推送
  83. * @param session
  84. * @param message
  85. */
  86. private static void sendMessage(Session session, String message) {
  87. try {
  88. session.getBasicRemote().sendText(message);
  89. } catch (IOException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. /**
  94. * 按照用户id推送消息
  95. * @param userId
  96. * @param jsonResult
  97. */
  98. public static void sendMessage(Integer userId, JsonResult jsonResult) {
  99. for (WebSocketServer webSocketServer : webSocketServers) {
  100. try {
  101. if (webSocketServer.userId.equals(userId)) {
  102. webSocketServer.sendMessage(JSON.toJSONString(jsonResult));
  103. }
  104. } catch (IOException e) {
  105. continue;
  106. }
  107. }
  108. }
  109. /**
  110. * 根据用户token推送消息
  111. * @param token
  112. * @param jsonResult
  113. */
  114. public static void sendMessage(String token, JsonResult jsonResult) {
  115. for (WebSocketServer webSocketServer : webSocketServers) {
  116. try {
  117. if (webSocketServer.token.equals(token)) {
  118. webSocketServer.sendMessage(JSON.toJSONString(jsonResult));
  119. }
  120. } catch (IOException e) {
  121. continue;
  122. }
  123. }
  124. }
  125. public static synchronized int onlineCount() {
  126. return webSocketServers.size();
  127. }
  128. }
  • JSON格式化使用的是阿里的fastjson,可自主引入依赖
  • JsonResult是自定义的返回对象
  • token是用户登录成功后返回的用户登录唯一标识,类似于sessionId
  • WebsocketUnauthorizedException是自定义异常,用于处理websocket连接未登录情况

 3、创建WebSocketConfiguration,开启WebSocket支持

  1. package com.hdy.manage.configuration;
  2. import com.hdy.manage.data.entity.SysUser;
  3. import org.apache.shiro.SecurityUtils;
  4. import org.apache.shiro.subject.Subject;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  8. import javax.websocket.HandshakeResponse;
  9. import javax.websocket.server.HandshakeRequest;
  10. import javax.websocket.server.ServerEndpointConfig;
  11. /**
  12. * 开启WebSocket支持
  13. */
  14. @Configuration
  15. public class WebSocketConfiguration extends ServerEndpointConfig.Configurator {
  16. @Override
  17. public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
  18. Subject subject = SecurityUtils.getSubject();
  19. if (subject.isAuthenticated()) {
  20. SysUser sysUser = (SysUser) subject.getPrincipal();
  21. sec.getUserProperties().put("userId", sysUser.getId());
  22. sec.getUserProperties().put("token", subject.getSession().getId());
  23. }
  24. super.modifyHandshake(sec, request, response);
  25. }
  26. @Bean
  27. public ServerEndpointExporter serverEndpointExporter() {
  28. return new ServerEndpointExporter();
  29. }
  30. }
  • 项目集成了Shiro框架做权限认证,Subject subject = SecurityUtils.getSubject()可以获取当前登录用户
  • 在modifyHandshake获取到登录用户保存userId和token传递到WebSocketServer进行获取

4、创建Redis消息队列发布者

  1. package com.hdy.manage.configuration.redis;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.hdy.manage.data.base.JsonResult;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class RedisPublication {
  9. @Autowired
  10. private StringRedisTemplate redisTemplate;
  11. /**
  12. * 根据userId推送消息
  13. * @param userId
  14. * @param jsonResult
  15. */
  16. public void sendMessage(Integer userId, JsonResult jsonResult) {
  17. JSONObject messageObject = new JSONObject();
  18. messageObject.put("userId", userId);
  19. messageObject.put("content", jsonResult);
  20. redisTemplate.convertAndSend("channel:message", messageObject.toJSONString());
  21. }
  22. /**
  23. * 根据token推送消息
  24. * @param token
  25. * @param jsonResult
  26. */
  27. public void sendMessage(String token, JsonResult jsonResult) {
  28. JSONObject messageObject = new JSONObject();
  29. messageObject.put("token", token);
  30. messageObject.put("content", jsonResult);
  31. redisTemplate.convertAndSend("channel:message", messageObject.toJSONString());
  32. }
  33. }

5、创建Redis消息队列消费者

  1. package com.hdy.manage.configuration.redis;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.hdy.manage.data.base.JsonResult;
  5. import com.hdy.manage.web.socket.WebSocketServer;
  6. import org.springframework.data.redis.connection.Message;
  7. import org.springframework.data.redis.connection.MessageListener;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * 设置redis订阅发布接收者
  11. */
  12. @Component
  13. public class RedisSubscription implements MessageListener {
  14. @Override
  15. public void onMessage(Message message, byte[] bytes) {
  16. String messageBody = new String(message.getBody());
  17. JSONObject messageObject = JSON.parseObject(messageBody);
  18. Integer userId = messageObject.getInteger("userId");
  19. JsonResult jsonResult = messageObject.getJSONObject("content").toJavaObject(JsonResult.class);
  20. if (null == userId) {
  21. String token = messageObject.getString("token");
  22. WebSocketServer.sendMessage(token, jsonResult);
  23. } else {
  24. WebSocketServer.sendMessage(userId, jsonResult);
  25. }
  26. }
  27. }

6、对Redis消息队列进行系统配置

  1. package com.hdy.manage.configuration.redis;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.data.redis.listener.PatternTopic;
  7. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  8. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
  9. @Configuration
  10. public class RedisConfiguration {
  11. @Bean
  12. RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
  13. RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
  14. redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
  15. // 可以添加多个 messageListener,配置不同的交换机
  16. redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("channel:message"));
  17. //redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("channel:notice"));
  18. return redisMessageListenerContainer;
  19. }
  20. @Bean
  21. MessageListenerAdapter messageListenerAdapter(RedisSubscription redisSubscription) {
  22. return new MessageListenerAdapter(redisSubscription, "onMessage");
  23. }
  24. @Bean
  25. StringRedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  26. return new StringRedisTemplate(redisConnectionFactory);
  27. }
  28. }

7、发送消息调用

  1. @Autowired
  2. private RedisPublication redisPublication;
  •  在类中注入RedisPublication,可以直接调用sendMessage方法即可进行消息发送
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/817862
推荐阅读
相关标签
  

闽ICP备14008679号