当前位置:   article > 正文

Websocket只看这一篇就够了 SpringBoot+Websocket+Redis 实现websocket集群_springboot+websocket基于redis订阅发布实现集群化

springboot+websocket基于redis订阅发布实现集群化

关于websocket做一次全面的总结。

实现的难点在什么地方?

  1. WebSocket Session 是不能放在redis中共享的。所以共享Session的方法是不可行的。
  2. WebSocket是长链接所以只能通过保持连接的服务器通知对应的客户端。一旦需要发送消息的服务器和保持连接的服务器不是同一台服务器时就有问题了。

实现WebSocket集群的2种方式

  1. 用redis的订阅/推送功能实现的。(推荐)
  2. 用redis存客户端连接对应的服务器的IP+端口,再用http去调用对应服务器的接口。

用redis的订阅/推送功能实现WebSocket

SpringBoot添加maven依赖

  1. <!-- 引入 websocket 依赖类-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-websocket</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-configuration-processor</artifactId>
  9. <optional>true</optional>
  10. </dependency>
  11. <!-- redisson -->
  12. <dependency>
  13. <groupId>org.redisson</groupId>
  14. <artifactId>redisson-spring-boot-starter</artifactId>
  15. <version>2.15.1</version>
  16. </dependency>
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  4. /**
  5. * @author
  6. */
  7. @Configuration
  8. public class WebSocketConfig {
  9. @Bean
  10. public ServerEndpointExporter serverEndpointExporter () {
  11. return new ServerEndpointExporter();
  12. }
  13. }

存储WebSocket Session

  1. mport javax.websocket.Session;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * @author
  5. */
  6. public class WebSocketBean {
  7. /**
  8. * 连接session对象
  9. */
  10. private Session session;
  11. /**
  12. * 连接错误次数
  13. */
  14. private AtomicInteger erroerLinkCount = new AtomicInteger(0);
  15. public int getErroerLinkCount() {
  16. // 线程安全,以原子方式将当前值加1,注意:这里返回的是自增前的值
  17. return erroerLinkCount.getAndIncrement();
  18. }
  19. public void cleanErrorNum() {
  20. // 清空计数
  21. erroerLinkCount = new AtomicInteger(0);
  22. }
  23. public Session getSession() {
  24. return session;
  25. }
  26. public void setSession(Session session) {
  27. this.session = session;
  28. }
  29. }

发送消息的实体类(群发)我是用2个类去做的可以根据自己的需求来。

  1. **
  2. * 推送全部
  3. * @author
  4. */
  5. public class SendMsgAll {
  6. /**
  7. * websocket业务数据(json)
  8. */
  9. private String msg;
  10. /**
  11. *业务模块类型
  12. */
  13. private String type;
  14. /**
  15. * 项目ID
  16. */
  17. private String projectId;
  18. public String getMsg() {
  19. return msg;
  20. }
  21. public void setMsg(String msg) {
  22. this.msg = msg;
  23. }
  24. public String getType() {
  25. return type;
  26. }
  27. public void setType(String type) {
  28. this.type = type;
  29. }
  30. public String getProjectId() {
  31. return projectId;
  32. }
  33. public void setProjectId(String projectId) {
  34. this.projectId = projectId;
  35. }

单发

  1. /**
  2. * 按用户推送
  3. * @author
  4. */
  5. public class SendMsg extends SendMsgAll{
  6. /**
  7. * 用户ID
  8. */
  9. private String userId;
  10. public String getUserId() {
  11. return userId;
  12. }
  13. public void setUserId(String userId) {
  14. this.userId = userId;
  15. }
  16. }

WebSocket server

  1. import com.app.domain.websocket.model.WebSocketBean;
  2. import com.app.domain.websocket.service.WebsocketEndpoint;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.stereotype.Service;
  7. import javax.websocket.*;
  8. import javax.websocket.server.PathParam;
  9. import javax.websocket.server.ServerEndpoint;
  10. import java.util.Map;
  11. import java.util.Set;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. /**
  14. * @author
  15. */
  16. @Service
  17. @ServerEndpoint(value = "/web/ws/{projectId}/{userId}")
  18. @Component
  19. public class WebsocketEndpointImpl implements WebsocketEndpoint {
  20. private static Logger log = LoggerFactory.getLogger(WebsocketEndpointImpl.class);
  21. /**
  22. * 错误最大重试次数
  23. */
  24. private static final int MAX_ERROR_NUM = 3;
  25. /**
  26. * 用来存放每个客户端对应的webSocket对象。
  27. */
  28. private static Map<String, Map<String, WebSocketBean>> webSocketInfo;
  29. static {
  30. // concurrent包的线程安全map
  31. webSocketInfo = new ConcurrentHashMap<String, Map<String, WebSocketBean>>();
  32. }
  33. @OnOpen
  34. public void onOpen(Session session, EndpointConfig config, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
  35. WebSocketBean bean = new WebSocketBean();
  36. bean.setSession(session);
  37. Map<String,WebSocketBean> concurrentHashMap = new ConcurrentHashMap();
  38. concurrentHashMap.put(userId,bean);
  39. webSocketInfo.put(projectId, concurrentHashMap);
  40. log.info("ws项目:"+projectId+",客户端连接服务器userId :" + userId + "当前连接数:" + countUser(projectId));
  41. }
  42. @OnClose
  43. public void onClose(Session session, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
  44. // 客户端断开连接移除websocket对象
  45. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  46. if(concurrentHashMap != null){concurrentHashMap.remove(userId);}
  47. log.info("ws项目:"+projectId+",客户端断开连接,当前连接数:" + countUser(projectId));
  48. }
  49. @OnMessage
  50. public void onMessage(Session session, String message, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
  51. log.info("ws项目:"+projectId+",客户端 userId: " + userId + ",消息:" + message);
  52. }
  53. @OnError
  54. public void onError(Session session, Throwable throwable) {
  55. // log.error("ws发生错误" + throwable.getMessage(), throwable);
  56. }
  57. public void sendMessage(Session session, String message, String projectId, String userId) {
  58. log.info("ws项目:"+projectId+",连接数:"+countUser(projectId)+",发送消息 " + session);
  59. try {
  60. // 发送消息
  61. synchronized (session) {
  62. if (session.isOpen()) {
  63. session.getBasicRemote().sendText(message);
  64. }
  65. }
  66. // 清空错误计数
  67. this.cleanErrorNum(projectId, userId);
  68. } catch (Exception e) {
  69. log.error("ws项目:"+projectId+",用户:"+userId+",发送消息失败" + e.getMessage(), e);
  70. int errorNum = this.getErroerLinkCount(projectId, userId);
  71. // 小于最大重试次数重发
  72. if (errorNum <= MAX_ERROR_NUM) {
  73. sendMessage(session, message, projectId, userId);
  74. } else {
  75. log.error("ws发送消息失败超过最大次数");
  76. // 清空错误计数
  77. this.cleanErrorNum(projectId, userId);
  78. }
  79. }
  80. }
  81. @Override
  82. public void batchSendMessage(String projectId,String message) {
  83. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  84. if(concurrentHashMap != null){
  85. Set<Map.Entry<String, WebSocketBean>> set = concurrentHashMap.entrySet();
  86. for(Map.Entry<String, WebSocketBean> map: set ){
  87. sendMessage(map.getValue().getSession(), message,projectId, map.getKey());
  88. }
  89. }
  90. }
  91. @Override
  92. public void sendMessageById(String projectId,String userId, String message) {
  93. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  94. if(concurrentHashMap != null){
  95. WebSocketBean webSocketBean = concurrentHashMap.get(userId);
  96. if (webSocketBean != null) {
  97. sendMessage(webSocketBean.getSession(), message, projectId,userId);
  98. }
  99. }
  100. }
  101. /**
  102. * 清空错误计数
  103. */
  104. private void cleanErrorNum(String projectId, String userId){
  105. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  106. if(concurrentHashMap != null){
  107. WebSocketBean webSocketBean = concurrentHashMap.get(userId);
  108. if (webSocketBean != null) {
  109. webSocketBean.cleanErrorNum();
  110. }
  111. }
  112. }
  113. /**
  114. * 获取错误计数
  115. */
  116. private int getErroerLinkCount(String projectId, String userId){
  117. int errorNum = 0;
  118. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  119. if(concurrentHashMap != null){
  120. WebSocketBean webSocketBean = concurrentHashMap.get(userId);
  121. if (webSocketBean != null) {
  122. errorNum = webSocketBean.getErroerLinkCount();
  123. }
  124. }
  125. return errorNum;
  126. }
  127. private Integer countUser (String projectId){
  128. int size = 0;
  129. Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
  130. if(concurrentHashMap != null) {
  131. size = concurrentHashMap.size();
  132. }
  133. return size;
  134. }
  135. }

webSocketInfo这Map你们根据需求设计一定要才用线程安全的Map,AtomicInteger这个计数器也是线程安全的。

  1. /**
  2. * 给客户端发送消息
  3. * @author
  4. */
  5. public interface WebsocketEndpoint {
  6. /**
  7. * 向所有在线用户群发消息
  8. * @param projectId 项目ID
  9. * @param message 发送给客户端的消息
  10. */
  11. void batchSendMessage(String projectId,String message);
  12. /**
  13. * 发送给对应的用户
  14. * @param userId 用户的ID
  15. * @param projectId 项目ID
  16. * @param message 发送的消息
  17. */
  18. void sendMessageById(String projectId,String userId, String message);
  19. }

redis订阅/推送

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.listener.PatternTopic;
  6. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  7. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
  8. /**
  9. * @author
  10. */
  11. @Configuration
  12. public class RedisMessageListenerConfig {
  13. @Autowired
  14. private RedisReceiver redisReceiver;
  15. /**
  16. * 监听redis中的订阅信息
  17. * @param redisConnectionFactory
  18. * @return
  19. */
  20. @Bean
  21. public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
  22. RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
  23. redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
  24. //添加redis消息队列监听,监听im-topic消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息。
  25. redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic("ptp-topic"));
  26. //同上一样
  27. redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic("mh-topic"));
  28. return redisMessageListenerContainer;
  29. }
  30. /**
  31. * 添加订阅消息处理类,通过反射获取处理类中的处理方法
  32. * 即使用RedisReceiver类中的sendMsg方法处理消息
  33. * @return
  34. */
  35. @Bean
  36. public MessageListenerAdapter messageListenerAdapter() {
  37. return new MessageListenerAdapter(redisReceiver, "sendMsg");
  38. }
  39. @Bean
  40. public MessageListenerAdapter messageAllListenerAdapter(){
  41. return new MessageListenerAdapter(redisReceiver, "sendAllMsg");
  42. }
  43. }

处理从redis中取出来的消息 

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.app.domain.websocket.model.SendMsg;
  3. import com.app.domain.websocket.model.SendMsgAll;
  4. import com.app.domain.websocket.service.WebsocketEndpoint;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. /**
  8. * 处理订阅redis的消息
  9. * @author
  10. */
  11. @Component
  12. public class RedisReceiver {
  13. @Resource
  14. WebsocketEndpoint websocketEndpoint;
  15. /**
  16. * 处理一对一消息
  17. * @param message 消息队列中的消息
  18. */
  19. public void sendMsg(String message) {
  20. SendMsg msg = JSONObject.parseObject(message, SendMsg.class);
  21. websocketEndpoint.sendMessageById(msg.getProjectId(),msg.getUserId(),msg.getMsg());
  22. }
  23. /**
  24. * 处理广播消息
  25. * @param message
  26. */
  27. public void sendAllMsg(String message){
  28. SendMsgAll msg = JSONObject.parseObject(message, SendMsgAll.class);
  29. websocketEndpoint.batchSendMessage(msg.getProjectId(),msg.getMsg());
  30. }
  31. }

还有关于项目中需要推送的消息存入队列中我也提供了公共的方法。实际操作按需求来。

  1. /**
  2. * 往Redis中存入消息
  3. * @author
  4. */
  5. public interface WebsocketService {
  6. /**
  7. * 向所有在线用户群发消息
  8. * @param message 发送给客户端的消息
  9. */
  10. void sendMessageAll(String projectId,String message);
  11. /**
  12. * 发送给对应的用户
  13. * @param userId 用户的ID
  14. * @param message 发送的消息
  15. */
  16. void sendMessageById(String projectId,String userId, String message);
  17. }

使用这个convertAndSend方法加入队列会被监听到

  1. import com.alibaba.fastjson.JSON;
  2. import com.app.domain.websocket.model.SendMsg;
  3. import com.app.domain.websocket.model.SendMsgAll;
  4. import com.app.domain.websocket.service.WebsocketService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.data.redis.core.RedisTemplate;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @author
  10. */
  11. @Service
  12. public class WebSocketServerImpl implements WebsocketService {
  13. @Autowired
  14. RedisTemplate<String,String> redisTemplate;
  15. @Override
  16. public void sendMessageAll(String projectId ,String message) {
  17. SendMsgAll sendMsgAll = new SendMsgAll();
  18. sendMsgAll.setProjectId(projectId);
  19. sendMsgAll.setMsg(message);
  20. redisTemplate.convertAndSend("mh-topic", JSON.toJSONString(sendMsgAll));
  21. }
  22. @Override
  23. public void sendMessageById(String projectId,String userId, String message) {
  24. SendMsg sendMsg = new SendMsg();
  25. sendMsg.setProjectId(projectId);
  26. sendMsg.setUserId(userId);
  27. sendMsg.setMsg(message);
  28. redisTemplate.convertAndSend("ptp-topic",JSON.toJSONString(sendMsg));
  29. }
  30. }

后端到这里代码就结束了

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta charset="utf-8">
  5. <title>websocket通讯</title>
  6. </head>
  7. <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
  8. <script>
  9. var socket;
  10. function openSocket() {
  11. if(typeof(WebSocket) == "undefined") {
  12. console.log("您的浏览器不支持WebSocket");
  13. }else{
  14. console.log("您的浏览器支持WebSocket");
  15. //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
  16. //url注意要加上自己的项目发布名称
  17. var socketUrl="http://localhost:8080/demo/web/ws/projectId/"+$("#userId").val();
  18. socketUrl=socketUrl.replace("https","wss").replace("http","ws");
  19. console.log(socketUrl)
  20. socket = new WebSocket(socketUrl);
  21. //打开事件
  22. socket.onopen = function() {
  23. console.log("websocket已打开");
  24. //socket.send("这是来自客户端的消息" + location.href + new Date());
  25. };
  26. //获得消息事件
  27. socket.onmessage = function(msg) {
  28. console.log(msg.data);
  29. };
  30. //关闭事件
  31. socket.onclose = function() {
  32. console.log("websocket已关闭");
  33. };
  34. //发生了错误事件
  35. socket.onerror = function() {
  36. console.log("websocket发生了错误");
  37. }
  38. }
  39. }
  40. function sendMessage() {
  41. if(typeof(WebSocket) == "undefined") {
  42. console.log("您的浏览器不支持WebSocket");
  43. }else {
  44. console.log("您的浏览器支持WebSocket");
  45. console.log('[{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}]');
  46. socket.send('[{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}]');
  47. }
  48. }
  49. </script>
  50. <body>
  51. <p>【userId】:<div><input id="userId" name="userId" type="text" value="25"></div>
  52. <p>【Message】:<div><input id="contentText" name="contentText" type="text" value="客户端"></div>
  53. <p>【操作】:<div><a onclick="openSocket()"></a>openSocket</div>
  54. <p>【操作】:<div><a onclick="sendMessage()">sendMessage</a></div>
  55. </body>
  56. </html>

第二种方法我就不去实现了(如果有人想看给我留言,和第一种比要复杂一点。还没有第一种好用。)

nginx配置(如果使用nginx代理了,下面是必须的要不WebSocket连不上)

  1. location /chat/ {
  2. proxy_pass http://backend;
  3. proxy_http_version 1.1;
  4. proxy_set_header Upgrade $http_upgrade;
  5. proxy_set_header Connection "upgrade";
  6. }

参考:http://nginx.org/en/docs/http/websocket.html

下面就说一下代码中会出现的问题

  1. [TEXT_FULL_WRITING]
  2. java.io.EOFException(websocket自动断开连接问题)

[TEXT_FULL_WRITING]

  1. onMessage方法有返回值,导致onMessage无法被sync block控制。
  2. session没有被同步控制,导致多线程情况下,出现IllegalStateException
  3. session.getAsyncRemote()可能是tomcat有bug引起,依旧会出现TEXT_FULL_WRITING

java.io.EOFException(websocket自动断开连接问题)

原因:使用了nginx服务,nginx配置:proxy_read_timeout(Default: 60s;),如果一直没有数据传输,连接会在过了这个时间之后自动关闭

解决方法:

  • 客户端每过小于60s的时间就给,服务器发送一个消息,服务器不用做处理维持心跳。
  • proxy_read_timeout时间设置大一点(就放在上面nginx配置中就可以了)

分享不易,记得。。。。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号