当前位置:   article > 正文

WebSocket 集群解决方案

websocket 集群解决方案

前言

WebSocket是一种在网络应用程序中,使客户度端和服务器之间可以进行双向通信的协议。它允许数据可以在建立连接后进行实时交换,而不必依赖传统的HTTP请求-响应模式。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

方案一:广播方案

图片

  • Step1: 客户端连接到某个Websocket Server,在该websocket Server中建立userid和session的绑定关系

  • Step2: 其它服务或者客户端通过MQ广播消息所有Websocket Server(消息体中带有userid)

  • Step3: 所有Websocket Server 根据客户端userid找到对应session, 只有存在userid和session的绑定关系的Websocket Server才发送消息到客户端

代码演示

1.Websocket Server 建立userid和session的绑定关系

  1. @ServerEndpoint("/websocket/{businessType}/{userId}")
  2. @Component
  3. public class WebSocketServer {
  4.     /**
  5.      * 若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
  6.      * 注意:allSession 只记录当前机器的 客户端连接,不是所有session连接
  7.      */
  8.     public static ConcurrentHashMap<String, Session> allSession = new ConcurrentHashMap<>();
  9.     @Resource
  10.     private RedisService redisService;
  11.     /**
  12.      * 连接建立成功调用的方法
  13.      *
  14.      * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
  15.      */
  16.     @OnOpen
  17.     public void onOpen(@PathParam(value = "businessType"String businessType, @PathParam(value = "userId"String userId, Session session, EndpointConfig config) {
  18.         if (StringUtils.isEmpty(userId)) {
  19.             return;
  20.         }
  21.         /**
  22.          * 加入到本地map
  23.          */
  24.         allSession.put(userId, session);
  25.     }
  26.     /**
  27.      * 连接关闭调用的方法
  28.      */
  29.     @OnClose
  30.     public void onClose(@PathParam(value = "userId"String userId, Session session) {
  31.         if (StringUtils.isNotEmpty(userId)) {
  32.             allSession.remove(userId);
  33.         }
  34.     }
  35.     /**
  36.      * 发生错误时调用
  37.      *
  38.      * @param
  39.      * @param
  40.      */
  41.     @OnError
  42.     public void onError(@PathParam(value = "userId"String userId, Session session, Throwable error) {
  43.     }
  44.     /**
  45.      * 用户id
  46.      *
  47.      * @param userId
  48.      * @param message
  49.      */
  50.     public void sendMessageToOneUser(Integer userId, String message, String msgId) {
  51.         if (userId == null) {
  52.             return;
  53.         }
  54.         Session session = allSession.get(String.valueOf(userId));
  55.         if (session != null) {
  56.          //所有Websocket Server 根据客户端userid找到对应session, 只有存在userid和session的绑定关系的Websocket Server才发送消息到客户端
  57.           session.getAsyncRemote().sendText(message);
  58.         } else {
  59.             System.err.println("session为空");
  60.             allSession.remove(userId + "");
  61.         }
  62.     }
  63. }

2.所有Websocket Server 接收消息并处理

  1. @Component
  2. @RequiredArgsConstructor
  3. public class CreateOrderConsumer implements BaseConsumer {
  4.     private final WebSocketServer webSocketServer;
  5.     @Override
  6.     public Action handleMessage(Message message) {
  7.         CreateOrderMessage createOrderMessage = JSON.parseObject(message.getBody(), LinkCreateOrderMessage.class);
  8.         try {
  9.            //业务校验省略...
  10.            //调用WebSocketServer的sendMessageToOneUser方法,里面根据客户端userid找到对应session, 只有存在userid和session的绑定关系的Websocket Server才发送消息到客户端
  11.             webSocketServer.sendMessageToOneUser(createOrderMessage.getUserId(), JSON.toJSONString(linkActionRes),message.getMsgID());
  12.         } catch (Exception e) {
  13.             e.printStackTrace();
  14.             return Action.ReconsumeLater;
  15.         }
  16.         return Action.CommitMessage;
  17.     }
  18. }    

方案二:目标询址方案(推荐)

图片

Id标识有两种实现形式:

  • 为唯一的服务名:每一个WebSocketServer生成唯一的服务名(serviceName="XXX-" + IdUtil.oneId())并注册到naocs服务组册中心,uesrid与其绑定,服务适用方使用Feign 或其它RPC调用http://{serviceName}/xxx/xxx到指定WebSocketServer

  • 为唯一的IP+端口:每一个WebSocketServer 获取自己IP+端口,uesrid与其绑定,服务调用方使用该IP+端口

代码演示(唯一Id为唯一的服务名的形式)

1.绑定userid和服务名唯一Id的关系(以ApplicationName形式为例)

  1. @SpringBootApplication
  2. public class WsApplication  {
  3.     public static void main(String[] args) {
  4.         //动态服务名
  5.         System.setProperty("myApplicationName""WS-" + IdUtil.oneId());
  6.         SpringApplication.run(WsApplication.class, args);
  7.     }
  8. }
  1. spring:
  2.   application:
  3.     #随机名字,做ws集群使用
  4.     name: ${myApplicationName}
  1. @ServerEndpoint("/websocket/{businessType}/{userId}")
  2. @Component
  3. public class WebSocketServer {
  4.     /**
  5.      * 若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
  6.      * 注意:allSession 只记录当前机器的 客户端连接,不是所有session连接
  7.      */
  8.     public static ConcurrentHashMap<String, Session> allSession = new ConcurrentHashMap<>();
  9.     /**
  10.      *
  11.      */
  12.     private String myApplicationName = System.getProperty("myApplicationName");
  13.     @Resource
  14.     private RedisService redisService;
  15.     /**
  16.      * 连接建立成功调用的方法
  17.      * 关键代码
  18.      * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
  19.      */
  20.     @OnOpen
  21.     public void onOpen(@PathParam(value = "businessType"String businessType, @PathParam(value = "userId"String userId, Session session, EndpointConfig config) {
  22.         if (StringUtils.isEmpty(userId)) {
  23.             return;
  24.         }
  25.         /**
  26.          * 加入到本地map
  27.          */
  28.         allSession.put(userId, session);
  29.         //绑定userid和服务名唯一Id的关系
  30.         redisService.hset(WS_MAPPING, userId + "", myApplicationName);
  31.     }
  32.     /**
  33.      * 连接关闭调用的方法
  34.      */
  35.     @OnClose
  36.     public void onClose(@PathParam(value = "userId"String userId, Session session) {
  37.         if (StringUtils.isNotEmpty(userId)) {
  38.             allSession.remove(userId);
  39.         }
  40.     }
  41.     /**
  42.      * 发生错误时调用
  43.      *
  44.      * @param
  45.      * @param
  46.      */
  47.     @OnError
  48.     public void onError(@PathParam(value = "userId"String userId, Session session, Throwable error) {
  49.     }
  50.     /**
  51.      * 用户id
  52.      *
  53.      * @param userId
  54.      * @param message
  55.      */
  56.     public void sendMessageToOneUser(Integer userId, String message) {
  57.         if (userId == null) {
  58.             return;
  59.         }
  60.         Session session = allSession.get(String.valueOf(userId));
  61.         if (session != null) {
  62.          //所有Websocket Server 根据客户端userid找到对应session, 只有存在userid和session的绑定关系的Websocket Server才发送消息到客户端
  63.           session.getAsyncRemote().sendText(message);
  64.         } else {
  65.             System.err.println("session为空");
  66.             allSession.remove(userId + "");
  67.         }
  68.     }
  69. }

2.Websocket Server提供的调用接口

  1. @RestController
  2. @RequestMapping("push")
  3. public class  WebSocketPushController {
  4.     @PostMapping("{userId}")
  5.     public void pushMessage(@PathVariable Long userId, @RequestBody Object message) {
  6.             webSocketServer.sendMessageToOneUser(userId, message);
  7.     }
  8. }

3.调用方通过nacos调用目标Websocket Server

  1. //业省略
  2. MyApplicationName myApplicationName =  redisService.hget(WS_MAPPING, userId + "");
  3. Feign:
  4. http://${myApplicationName}/push/{userId}

图片

来源:juejin.cn/post/7306451559928348709

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/894045
推荐阅读
相关标签
  

闽ICP备14008679号