当前位置:   article > 正文

springboot2.x 建立websocket服务端和客户端(前后端)使用,支持socket集群_websocket服务器端搭建

websocket服务器端搭建

如果觉得有用的话,麻烦点赞+关注!

 进入下面小程序可以体验效果:

 

websocket是一个全双工通信协议,允许socket客户端和socket服务端双向推送数据进行交互。大部分都是在后端做socket服务端的搭建,前端作为socket客户端进行访问。但是也会有一种情况,需要来自后端内部的消息发送给socket服务端然后再推给其他socket客户端,在springboot的实现方式中,并不需要自身去实现过程,有本身已封装完善的方案。如下:

一、服务端搭建

需要引入maven 依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

配置socket 服务端bean对象管理

  1. import lombok.extern.log4j.Log4j2;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  5. import javax.websocket.server.ServerEndpointConfig;
  6. @Log4j2
  7. @Configuration
  8. public class WebSocketConfig extends ServerEndpointConfig.Configurator {
  9. @Bean
  10. public ServerEndpointExporter serverEndpointExporter(){
  11. return new ServerEndpointExporter();
  12. }
  13. }

服务端消息处理

ICacheService 使用的是redis发布订阅功能,使用的是redission技术实现。

redis 使用redisson缓存api模板_Garc的博客-CSDN博客

  1. import com.fusionfintrade.cache.ICacheService;
  2. import com.fusionfintrade.config.WebSocketConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.util.CollectionUtils;
  6. import javax.annotation.PostConstruct;
  7. import javax.annotation.Resource;
  8. import javax.websocket.*;
  9. import javax.websocket.server.PathParam;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.util.concurrent.ConcurrentHashMap;
  12. import java.util.concurrent.CopyOnWriteArraySet;
  13. /**
  14. * @author Garcia
  15. * 集群原理:
  16. * 使用reids队列发布消息至各个集群节点
  17. * 各个节点监听redis对应的key消息,并将消息散发至各个节点对应的客户端中。
  18. */
  19. @Component
  20. @ServerEndpoint(value = "/ws/{from}", configurator = WebSocketConfig.class)
  21. @Slf4j
  22. public class SocketServer {
  23. @Resource
  24. private ICacheService cacheService;
  25. private static ConcurrentHashMap<String, CopyOnWriteArraySet<Session>> socketMap = new ConcurrentHashMap<>();
  26. private SocketServer socketServer;
  27. /**
  28. * 监听redis消息,并将对应的key消息使用socket发散至前端对应的功能
  29. */
  30. @PostConstruct
  31. private void init(){
  32. socketServer = this;
  33. cacheService.addListener(SocketClientEnum.RISK_WEB.getKey(), String.class,(channel, msg) ->socketServer.singleSendMessage(SocketClientEnum.RISK_WEB.getKey(),msg));
  34. }
  35. @OnOpen
  36. public void onOpen(Session session, @PathParam("from") String from){
  37. //将session会话保存,根据来源存。
  38. CopyOnWriteArraySet<Session> sessionSet = socketMap.get(from);
  39. if (CollectionUtils.isEmpty(sessionSet)){
  40. sessionSet = new CopyOnWriteArraySet<>();
  41. sessionSet.add(session);
  42. socketMap.put(from,sessionSet);
  43. }else {
  44. sessionSet.add(session);
  45. }
  46. log.info("连接:{},[{}]集当前连接数:{}",session.getRequestURI(),from,sessionSet.size());
  47. }
  48. @OnMessage
  49. public void onMessage(String message,Session session){
  50. //接收其他客户端的socket消息,并发给指定客户端socket,key可以通过再message中携带进来
  51. // sendMessage(key,message);
  52. }
  53. @OnClose
  54. public void onClose(Session session, @PathParam("from") String from){
  55. CopyOnWriteArraySet<Session> sessions = socketMap.get(from);
  56. sessions.remove(session);
  57. }
  58. @OnError
  59. public void onError(Throwable e, @PathParam("from") String from){
  60. log.error("{}socket连接异常",from,e);
  61. }
  62. /**
  63. * 集群节点发布消息
  64. * key:对应功能的客户端
  65. * message:消息内容
  66. * @param key
  67. * @param message
  68. */
  69. public void clusterSendMessage(String key,String message){
  70. cacheService.publish(key,message);
  71. }
  72. /**
  73. * 单节点发布消息
  74. * key:对应功能的客户端
  75. * message:消息内容
  76. * @param message
  77. */
  78. public void singleSendMessage(String key,String message){
  79. try {
  80. CopyOnWriteArraySet<Session> session = socketMap.get(key);
  81. if (CollectionUtils.isEmpty(session)){
  82. return;
  83. }
  84. session.stream().parallel().forEach(s ->{
  85. try {
  86. s.getBasicRemote().sendText(message);
  87. } catch (Exception e) {
  88. log.error("websocket消息推送异常",e);
  89. }
  90. });
  91. }catch (Exception e){
  92. log.error("socket发送消息失败",e);
  93. }
  94. }
  95. }

二、后端socket客户端搭建

客户端需要创建连接后才能使用,但是只需要创建一次即可,所以可以使用spring管理这个对象,或者跟我这个一样,写成单例模式,随时随地抽取使用。真正使用的方法是sendMessage

  1. import lombok.extern.log4j.Log4j2;
  2. import javax.websocket.*;
  3. import java.io.IOException;
  4. import java.net.URI;
  5. @Log4j2
  6. @ClientEndpoint
  7. public class SocketClient {
  8. private static SocketClient instance;
  9. private SocketClient(){}
  10. private Session session;
  11. public static synchronized SocketClient getInstance(){
  12. if (instance == null){
  13. instance = new SocketClient();
  14. }
  15. instance.create();
  16. return instance;
  17. }
  18. private void create(){
  19. if (session==null||!session.isOpen()){
  20. try {
  21. WebSocketContainer container = ContainerProvider.getWebSocketContainer();
  22. container.connectToServer(instance,new URI("ws://127.0.0.1:18110/g-alarm/ws/system"));
  23. }catch (Exception e){
  24. log.error("socket client init error",e);
  25. }
  26. }
  27. }
  28. @OnOpen
  29. public void onOpen(Session session){
  30. this.session = session;
  31. }
  32. @OnMessage
  33. public void onMessage(String message,Session session){
  34. }
  35. @OnClose
  36. public void onClose(Session session, CloseReason closeReason){
  37. }
  38. @OnError
  39. public void onError(Throwable e){
  40. log.error("socket连接异常",e);
  41. }
  42. public void sendMessage(String message){
  43. synchronized (session){
  44. try {
  45. session.getBasicRemote().sendText(message);
  46. } catch (IOException e) {
  47. log.error("socket客户端发送消息异常");
  48. }
  49. }
  50. }
  51. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/817858
推荐阅读
相关标签
  

闽ICP备14008679号