当前位置:   article > 正文

websocket之netty-socketio高性能实时消息推送框架

netty-socketio

简介

websocket协议的出现,打破http那种只能由客户端发起请求,服务器响应的单项数据传输局面,采用websocket之后,可以由客户端推送数据到服务端,也可以由服务器主动推送数据到客户端,从而同时进行双向通信,便于数据实时更新推送

netty-socketio便是为了方便我们进行websocket开发的框架,底层采用netty,提供了可靠高效的性能

netty-socketio的官方github地址:https://github.com/mrniko/netty-socketio

入门demo

咋们先来一个入门的websocket程序,带大家先简单了解下netty-socketio的使用

demo相关代码地址:https://gitee.com/xumengqq/work-websocketw.git

项目结构图

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>com.corundumstudio.socketio</groupId>
  7. <artifactId>netty-socketio</artifactId>
  8. <version>1.7.19</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.alibaba</groupId>
  12. <artifactId>fastjson</artifactId>
  13. <version>1.2.73</version>
  14. </dependency>

配置

  1. # 自定义socket服务端配置
  2. websocket:
  3. server:
  4. host: localhost
  5. port: 9999

服务端代码实现

netty-socketio框架采用的数据传输格式是["key","data"],这是一个数组,前面的key是信令,可以自定义,后面是传递的数据,理解为对"key"信令发送了一条"data"数据。也可以类比成对某个接口发送了一条数据。后面的代码编写思路也是监听这个key信令上传的数据,或者主动对这个key信令下发数据

实体类和常量类
  1. @Data
  2. public class Message {
  3. private String userId;
  4. private String room;
  5. private String data;
  6. }
  7. public class EventConstant {
  8. public static final String TEST_EVENT = "test";
  9. public static final String MESSAGE = "message";
  10. public static final String USER_ID = "userId";
  11. public static final String JOIN_ROOM = "joinRoom";
  12. public static final String LEAVE_ROOM = "leaveRoom";
  13. /**
  14. * 获取该用户所有加入的房间
  15. */
  16. public static final String GET_MY_ROOMS = "getMyRooms";
  17. /**
  18. * 房间内广播消息
  19. * */
  20. public static final String ROOM_BROADCAST = "roomBroadcast";
  21. /**
  22. * 全局广播消息
  23. * */
  24. public static final String GLOBAL_BROADCAST = "globalBroadcast";
  25. public static final String SEND_USER_DATA = "sendUserData";
  26. }
socket配置类编写
  1. @Configuration
  2. @Log4j2
  3. public class WebsocketConfig {
  4. @Value("${websocket.server.host:localhost}")
  5. private String host;
  6. @Value("${websocket.server.port:9999}")
  7. private Integer websocketPort;
  8. /**
  9. * 设置socket service基本配置
  10. * @return
  11. */
  12. @Bean
  13. public SocketIOServer getSocketIOServer(){
  14. com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
  15. configuration.setPort(websocketPort);
  16. configuration.setHostname(host);
  17. configuration.getSocketConfig().setTcpNoDelay(true);
  18. configuration.getSocketConfig().setReuseAddress(true);
  19. return new SocketIOServer(configuration);
  20. }
  21. /**
  22. * 扫描socket相关的注解,例如@OnConnect@DisConnect@OnEvent
  23. * @return
  24. */
  25. @Bean
  26. public SpringAnnotationScanner springAnnotationScanner(){
  27. return new SpringAnnotationScanner(getSocketIOServer());
  28. }
  29. }

netty-socketio还提供了命名空间的概念,简单来说就是一个netty-socketio应用程序,里面有n个namespace,一个namespace里面又有n个room,用户默认会加入一个空字符""名称 的namespace和一个空字符""名称的room,下面有获取默认的namespace和room的代码和运行结果。绝大多数情况下使用默认的namespace就可以了,这里不做展开。

 

 其它的配置可以参考github上的说明:https://github.com/mrniko/netty-socketio/wiki/Configuration-details

  • setHostname If not set then bind address will be 0.0.0.0 or ::0

  • setPort The port the socket.io server will listen to

  • setJsonTypeFieldName defaults to "@class"

  • setJsonSupport Allows to setup custom implementation of JSON serialization/deserialization. See JsonSupport

  • setAuthorizationListener Authorization listener invoked on every handshake. Accepts all clients by default.

  • setStoreFactory Client store and pubsub factory, used to store session data and implements distributed pubsub. Defaults to MemoryStoreFactory, but RedisStoreFactory and HazelcastStoreFactory also implemented.

  • setPreferDirectBuffer (added in 1.5 version) Buffer allocation method used during packet encoding. Defaults to true.

  • setBossThreads (added in 1.5 version) boss-threads amount for netty

  • setWorkerThreads (added in 1.5 version) worker-threads amount for netty

  • setHeartbeatInterval Heartbeat interval (in seconds), defaults to 25

  • setHeartbeatTimeout Heartbeat timeout (in seconds), defaults to 60. Use 0 to disable it

  • setCloseTimeout Channel close timeout (in seconds) due to inactivity, defaults to 60

  • setContext Namespace, defaults to "/socket.io"

  • setAllowCustomRequests Allow to service custom requests that differ from socket.io protocol, defaults to false.

    • If true, add own handler which handle custom requests in order to avoid hang connections.
  • setPollingDuration Polling interval for XHR transport (in seconds), defaults to 20

  • setKeyStorePassword SSL key store password (for secure connections)

  • setKeyStore SSL key store stream, maybe appointed to any source

  • setMaxHttpContentLength Set maximum HTTP content length limit, defaults to 64KB.

    • If the length of the aggregated content exceeds this value, a TooLongFrameException will be raised.
  • setTransports Transports supported by server, defaults to [Transport.WEBSOCKET, Transport.FLASHSOCKET, Transport.XHRPOLLING]. Cannot be empty list

socket服务启动和销毁
  1. @SpringBootApplication
  2. @Log4j2
  3. public class SingleApplication implements CommandLineRunner, DisposableBean {
  4. @Autowired
  5. private SocketIOServer socketIOServer;
  6. public static void main(String[] args) {
  7. SpringApplication.run(SingleApplication.class, args);
  8. }
  9. /**
  10. * 启动socket服务器
  11. * @param args
  12. * @throws Exception
  13. */
  14. @Override
  15. public void run(String... args) throws Exception {
  16. log.info("socket start ...");
  17. socketIOServer.start();
  18. log.info("socket started");
  19. }
  20. /**
  21. * spring容器关闭时将socket服务也停止
  22. * @throws Exception
  23. */
  24. @Override
  25. public void destroy() throws Exception {
  26. socketIOServer.stop();
  27. log.info("socket server stop");
  28. }
  29. }
客户端连接和断开
  1. @Component
  2. @Log4j2
  3. public class ConnectHandel {
  4. public static final String USER_ID = "userId";
  5. /**
  6. * 客户端socket连接后触发
  7. * @param client
  8. */
  9. @OnConnect
  10. public void connect(SocketIOClient client){
  11. // 获取登录信息,也可以在这里面进行权限判断
  12. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  13. log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
  14. }
  15. /**
  16. * 客户端断开服务器时触发
  17. * @param client
  18. */
  19. @OnDisconnect
  20. public void disConnect(SocketIOClient client){
  21. // 获取登录信息
  22. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  23. log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
  24. }
  25. }
业务代码
客户端和服务器简单交互
  1. @Autowired
  2. private SocketIOServer socketIOServer;
  3. @OnEvent(MESSAGE)
  4. public void messageEvent(SocketIOClient client,String message) throws InterruptedException {
  5. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  6. log.info("testEvent userId {}, sessionUuid {}, message {}",userId,client.getSessionId(),message);
  7. // 业务处理,这里写一个模拟demo
  8. // 向客户端发消息
  9. client.sendEvent(MESSAGE,"服务处理中...");
  10. Thread.sleep(1000);
  11. // 业务处理结束后再向客户端发消息
  12. client.sendEvent(MESSAGE,"服务处理完成!");
  13. }
加入房间
  1. @OnEvent(JOIN_ROOM)
  2. public void joinRoom(SocketIOClient client,String room){
  3. client.joinRoom(room);
  4. }
离开房间
  1. @OnEvent(LEAVE_ROOM)
  2. public void leaveRoom(SocketIOClient client,String room){
  3. client.leaveRoom(room);
  4. }
房间内广播消息
  1. @OnEvent(ROOM_BROADCAST)
  2. public void roomBroadcast(SocketIOClient client, String data){
  3. Message message = JSON.parseObject(data, Message.class);
  4. roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
  5. }
  6. public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
  7. if (Objects.isNull(excludeClient)){
  8. socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
  9. }else {
  10. socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
  11. }
  12. }
全局广播消息
  1. @OnEvent(GLOBAL_BROADCAST)
  2. public void globalBroadcast(SocketIOClient client, String data){
  3. globalBroadcastData(MESSAGE,data,null);
  4. }
  5. public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
  6. if(Objects.isNull(excludeClient)){
  7. socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
  8. }else {
  9. socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
  10. }
  11. }
 点对点消息发送

这个需要存储userId和client的对应关系,知道需要发送的目标userId,就能获取到client,然后直接发消息既可

我们在@OnConnect和@DisConnect中来绑定和解除userId和client的关系

客户端

客户端代码我是在网上找的,大家可按自己的方式实现既可

js方式
  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
  5. <title>SocketIO客户端测试环境</title>
  6. <base>
  7. <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
  8. <script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script>
  9. <style>
  10. body {
  11. padding: 20px;
  12. }
  13. #console {
  14. height: 450px;
  15. overflow: auto;
  16. }
  17. .connect-msg {
  18. color: green;
  19. }
  20. .disconnect-msg {
  21. color: red;
  22. }
  23. </style>
  24. </head>
  25. <body>
  26. <h1>客户端测试环境</h1>
  27. <hr style="height:1px;border:none;border-top:1px solid black;" />
  28. <div style="width: 700px; float: left">
  29. <h3>SocketClient建立连接</h3>
  30. <div style="border: 1px;">
  31. <label>socketio服务端地址:</label>
  32. <!--
  33. http://localhost 服务端ip
  34. 9999 服务端socket端口(服务端提供)
  35. test或socketIO 命名空间(可自定义)如果不定义命名空间,默认是/ 比如:http://localhost:9999?userId=12345
  36. userId 用户id参数(可自定义)
  37. ps:因为我定义了命名空间/test和/socketIO,所以我这里也可以用
  38. http://localhost:9999/test?userId=12345
  39. http://localhost:9999/socketIO?userId=12345
  40. 这里我用http://localhost:9999?userId=12345建立连接,因为这里还不涉及到请求不同命名空间的方法
  41. -->
  42. <input type="text" id="url" value="ws://localhost:9999?userId=12345" style="width: 500px;">
  43. <br>
  44. <br>
  45. <button id="connect" style="width: 100px;">建立连接</button>
  46. <button id="disconnect" style="width: 100px;">断开连接</button>
  47. </div>
  48. <hr style="height:1px;border:none;border-top:1px solid black;" />
  49. <h3>SocketClient发送消息</h3>
  50. <div style="border: 1px;">
  51. <label>socketEvent名称:</label><input type="text" id="socketEvent" value="getUserRooms">
  52. <br><br>
  53. <textarea id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容"></textarea>
  54. <button id="send" style="width: 100px;">发送消息</button>
  55. </div>
  56. <hr style="height:1px;border:none;border-top:1px solid black;" />
  57. </div>
  58. <div style="float: left;margin-left: 50px;">
  59. <h3>SocketIO互动消息</h3>
  60. <button id="clean" style="width: 100px;">清理输出</button>
  61. <div id="console" class="well"></div>
  62. </div>
  63. </body>
  64. <script type="text/javascript">
  65. var socket ;
  66. var errorCount = 0;
  67. var isConnected = false;
  68. var maxError = 5;
  69. //连接
  70. function connect(url) {
  71. //var opts = {
  72. // query: 'userId='+userId
  73. //};
  74. //socket = io.connect(url, opts);
  75. socket = io.connect(url);
  76. //socket.nsp = "/socketIO";//定义命名空间
  77. console.log(socket)
  78. //监听本次连接回调函数
  79. socket.on('connect', function () {
  80. isConnected =true;
  81. console.log("连接成功");
  82. serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>连接成功</span>');
  83. errorCount=0;
  84. });
  85. //监听消息
  86. socket.on('message', function (data) {
  87. output('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + data + ' </span>');
  88. console.log(data);
  89. });
  90. //监听断开
  91. socket.on('disconnect', function () {
  92. isConnected =false;
  93. console.log("连接断开");
  94. serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '已下线! </span>');
  95. });
  96. //监听断开错误
  97. socket.on('connect_error', function(data){
  98. serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>;' + '连接错误-'+data+' </span>');
  99. errorCount++;
  100. if(errorCount>=maxError){
  101. socket.disconnect();
  102. }
  103. });
  104. //监听连接超时
  105. socket.on('connect_timeout', function(data){
  106. serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '连接超时-'+data+' </span>');
  107. errorCount++;
  108. if(errorCount>=maxError){
  109. socket.disconnect();
  110. }
  111. });
  112. //监听错误
  113. socket.on('error', function(data){
  114. serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '系统错误-'+data+' </span>');
  115. errorCount++;
  116. if(errorCount>=maxError){
  117. socket.disconnect();
  118. }
  119. });
  120. /*socket.on('ack', function(data){
  121. console.log("ack:"+data)
  122. var str = '消息发送失败';
  123. if(data==1){
  124. str = '消息发送成功';
  125. }
  126. serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + str+' </span>');
  127. });*/
  128. }
  129. function output(message) {
  130. var element = $("<div>" + " " + message + "</div>");
  131. $('#console').prepend(element);
  132. }
  133. function serverOutput(message) {
  134. var element = $("<div>" + message + "</div>");
  135. $('#console').prepend(element);
  136. }
  137. //连接
  138. $("#connect").click(function(){
  139. if(!isConnected){
  140. var url = $("#url").val();
  141. connect(url);
  142. }else {
  143. serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '已经成功建立连接,不要重复建立!!! </span>');
  144. }
  145. })
  146. //断开连接
  147. $("#disconnect").click(function(){
  148. if(isConnected){
  149. socket.disconnect();
  150. }
  151. })
  152. //发送消息
  153. $("#send").click(function(){
  154. var socketEvent = $("#socketEvent").val();//自定义的事件名称
  155. var content = $("#content").val();//发送的内容
  156. socket.emit(socketEvent,content,function(data1,data2){
  157. console.log("ack1:"+data1);
  158. console.log("ack2:"+data2);
  159. });
  160. })
  161. //清理消息
  162. $("#clean").click(function(){
  163. $('#console').html("");
  164. })
  165. function getNowTime(){
  166. var date=new Date();
  167. var year=date.getFullYear(); //获取当前年份
  168. var mon=date.getMonth()+1; //获取当前月份
  169. var da=date.getDate(); //获取当前日
  170. var h=date.getHours(); //获取小时
  171. var m=date.getMinutes(); //获取分钟
  172. var s=date.getSeconds(); //获取秒
  173. var ms=date.getMilliseconds();
  174. var d=document.getElementById('Date');
  175. var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms;
  176. return date;
  177. }
  178. </script>
  179. </html>
java 实现客户端
  1. package com.corundumstudio.socketio.demo.client;
  2. import io.socket.client.IO;
  3. import io.socket.client.Socket;
  4. import io.socket.emitter.Emitter;
  5. import java.net.URISyntaxException;
  6. public class DemoSocketClient {
  7. public static void main(String[] args) throws URISyntaxException, InterruptedException {
  8. IO.Options options = new IO.Options();
  9. options.transports = new String[]{"websocket"};
  10. options.reconnectionAttempts = 2;
  11. options.reconnectionDelay = 1000;//失败重连的时间间隔
  12. options.timeout = 500;//连接超时时间(ms)
  13. final Socket socket = IO.socket("http://localhost:9999/?userId=12345", options);
  14. socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
  15. @Override
  16. public void call(Object... args) {
  17. socket.send("hello");
  18. }
  19. });
  20. socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
  21. @Override
  22. public void call(Object... args) {
  23. System.out.println("连接关闭");
  24. }
  25. });
  26. socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
  27. @Override
  28. public void call(Object... args) {
  29. System.out.println("sessionId:" + socket.id());
  30. for (Object obj : args) {
  31. System.out.println(obj);
  32. }
  33. System.out.println("收到服务器应答,将要断开连接...");
  34. socket.disconnect();
  35. }
  36. });
  37. socket.connect();
  38. }
  39. }

 

生产集群方案

上面的demo是单机版本,但是,生产上服务都是集群存在的,至少也要两台,双节点防止单点故障;可是websocket服务搭建集群会有个问题,就是服务间通信的问题;比如,Aclient连接上Aserver,Bclient连接上了Bserver,这时候,Aclient就没法对Bclient进行发送消息和接受 消息。

这里提供几种解决方案

Redis存储方案

redis是官方推荐的解决方案,这个在Github上可以看到。配置之后,利用redis这个中介,可以通过广播的方式对其他客户端进行通信,底层是用了redis的发布订阅功能。有两种通信方式,可以通过全局广播通知到其他客户端,也可以对加入同一个房间内的客户端进行广播。但是这个有局限性,netty-socketio只提供了广播的redis发布订阅方案,点对点方式并没有提供。

pom依赖
  1. <!-- 添加redisson依赖-->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson-spring-boot-starter</artifactId>
  5. <version>3.15.4</version>
  6. </dependency>
yaml配置
  1. # redis
  2. spring:
  3. redis:
  4. host: 121.196.111.111 #地址
  5. port: 6379 #端口号
  6. database: 0 #0号库
  7. password: password #密码
配置类
  1. // redisson的配置类
  2. @Configuration
  3. public class RedissonConfig {
  4. @Value("${spring.redis.host}")
  5. private String host;
  6. @Value("${spring.redis.password}")
  7. private String password;
  8. @Value("${spring.redis.port}")
  9. private Integer port;
  10. @Value("${spring.redis.database:0}")
  11. private Integer database;
  12. @Value("${spring.redis.timeout:3000}")
  13. private Integer timeout;
  14. @Bean
  15. public RedissonClient redissonClient(){
  16. Config config = new Config();
  17. // config.useClusterServers().addNodeAddress("redis://").setPassword("");
  18. config.useSingleServer().setAddress("redis://"+ host +":" + port).setPassword(password).setDatabase(database).setTimeout(timeout);
  19. RedissonClient redissonClient = Redisson.create(config);
  20. return redissonClient;
  21. }
  22. }
  23. // 在SocketIOServer配置上redisson既可使用
  24. @Autowired
  25. private RedissonClient redissonClient;
  26. @Bean
  27. public SocketIOServer getSocketIOServer(){
  28. com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
  29. configuration.setPort(websocketPort);
  30. configuration.setHostname(host);
  31. configuration.getSocketConfig().setTcpNoDelay(true);
  32. configuration.getSocketConfig().setReuseAddress(true);
  33. // 设置redis存储方式
  34. configuration.setStoreFactory(new RedissonStoreFactory(redissonClient));
  35. return new SocketIOServer(configuration);
  36. }
全局广播和房间内广播
  1. @OnEvent(ROOM_BROADCAST)
  2. public void roomBroadcast(SocketIOClient client, String data){
  3. Message message = JSON.parseObject(data, Message.class);
  4. roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
  5. }
  6. @OnEvent(GLOBAL_BROADCAST)
  7. public void globalBroadcast(SocketIOClient client, String data){
  8. globalBroadcastData(MESSAGE,data,null);
  9. }
  10. public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
  11. if(Objects.isNull(excludeClient)){
  12. socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
  13. }else {
  14. socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
  15. }
  16. }
  17. public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
  18. if (Objects.isNull(excludeClient)){
  19. socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
  20. }else {
  21. socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
  22. }
  23. }
生产灰度发布

nginx重新加载配置会影响原有的websocket连接吗?可不可以通过更新路由配置实现websocket服务的灰度发布?

经过测试,在更改nginx配置信息后,比如将其中一个服务设置成down,输入nginx  -s reload命令,原有的websocket连接不会受到影响,但是新的连接不会再进入设置为down的服务,随时间推移,连接数就会越来越少,直至为0。因此,我们可以将需要发布的服务设置为down,然后再编写一个获取此服务连接client的数量的接口,当数量为0的时候,代表没有websocket连接了,此时,既可对这台服务器进行发布了。

获取某台服务器连接客户端数量

  1. @Autowired
  2. private SocketIOServer socketIOServer;
  3. @GetMapping("/getClientCount")
  4. public Object getClientCount(){
  5. Collection<SocketIOClient> allClients = socketIOServer.getAllClients();
  6. int size = allClients.size();
  7. return size;
  8. }

nginx配置websocket服务路由

这里需要采用hash的路由方式;不知道是框架还是其他的什么原因,客户端会发起三次的普通http请求和一次http升级成websocket的请求,采用其他的路由方式会导致这些请求不到同一个服务器从而导致websocket连接报错。这需要注意一下

  1. worker_processes 1;
  2. events {
  3. worker_connections 1024;
  4. }
  5. http {
  6. map $http_upgrade $connection_upgrade {
  7. default upgrade;
  8. '' close;
  9. }
  10. upstream websocket {
  11. hash $remote_addr consistent;
  12. server 127.0.0.1:9999 down;
  13. server 127.0.0.1:9998;
  14. }
  15. server {
  16. listen 80;
  17. location / {
  18. proxy_pass http://websocket;
  19. proxy_http_version 1.1;
  20. proxy_set_header Upgrade $http_upgrade;
  21. proxy_set_header Connection "$connection_upgrade";
  22. #proxy_set_header Host $host;
  23. }
  24. }
  25. }

Redis存储-点对点完善

之前的redis方案是netty-websocketio官方提供的集群解决方案,但此方案平没有解决点对点信息传输的问题,我们现在在此基础上还是通过redis的发布订阅来完善点对点信息推送。

当然,你也可以通过mq的方式来完成,个人觉得用redis的话,中小型项目就足够了,成本也低

思路:用户连上服务器,将用户和用户所连接的机器信息(这里指ip和端口号)保存在redis,断开连接则将redis上的记录清除掉;每台服务器订阅当前ip和端口号的topic,知道用户连接到那台服务器之后,发布消息则是发布到这个用户对应服务器ip和端口号的topic,这样,经过redis的订阅和发布,既可完成点对点的通信。(多台服务器分配多个topic方案是为了优化。如果采用一个topic,所有的机器都订阅同一个topic,然后每台机器再去赛选要发送的目标客户端是否连接在此机器上,这种方式会让其他非目标机器都进行一次赛选处理,略微有些影响性能,照成服务器不必要的开销)

ClientHub类

在之前代码的基础上,编写一个clientHub类,用于存储用户连接之前的信息

  1. @Component
  2. @Log4j2
  3. public class ClientHub implements CommandLineRunner {
  4. @Autowired
  5. private SocketIOServer socketIOServer;
  6. @Autowired
  7. private RedissonClient redissonClient;
  8. public static final String REDIS_TOPIC = "redis_topic";
  9. public static final String ONLINE_USER = "online_user_";
  10. /**
  11. * 本机存储userId与客户端之间的连接关系
  12. */
  13. private static Map<String, SocketIOClient> userIdToClient = new HashMap<>();
  14. /**
  15. * 集群模式下,点对点发送,一律采取这个方法,不能直接通过SocketClient发送
  16. * @param fromUserId
  17. * @param targetUserId
  18. * @param key
  19. * @param data
  20. */
  21. public void sendMessageByUserId(String fromUserId,String targetUserId,String key,String data){
  22. Message message = new Message();
  23. message.setFromUserId(fromUserId);
  24. message.setTargetUserId(targetUserId);
  25. message.setKey(key);
  26. message.setData(data);
  27. SocketIOClient socketIOClient = userIdToClient.get(targetUserId);
  28. // 如果是本机直连的,直接发送,不是则通过redis发布对应的机器订阅
  29. if(!Objects.isNull(socketIOClient)){
  30. socketIOClient.sendEvent(message.getKey(),message);
  31. }else {
  32. RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + targetUserId);
  33. if(bucket.isExists()){
  34. String topic = bucket.get();
  35. this.pubMessage(topic,message);
  36. }
  37. }
  38. }
  39. /**
  40. * 用户连接时调用,存储用户连接之间的信息
  41. * @param userId
  42. * @param client
  43. */
  44. public void addUser(String userId,SocketIOClient client){
  45. userIdToClient.put(userId,client);
  46. // 存储在redis上
  47. RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
  48. bucket.set(getRedisTopic());
  49. }
  50. /**
  51. * 用户断开连接时调用,清楚用户和客户端之前的关系
  52. * @param userId
  53. */
  54. public void deleteUser(String userId){
  55. userIdToClient.remove(userId);
  56. // 从redis上删除掉
  57. RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
  58. bucket.delete();
  59. }
  60. /**
  61. * 获取本socket服务的ip和端口号
  62. * @return
  63. */
  64. private String getHostAndPort(){
  65. String hostAddress = "";
  66. try {
  67. hostAddress = InetAddress.getLocalHost().getHostAddress();
  68. } catch (UnknownHostException e) {
  69. throw new RuntimeException(e);
  70. }
  71. int port = socketIOServer.getConfiguration().getPort();
  72. return hostAddress+"_"+port;
  73. }
  74. /**
  75. * 获取这台机器redis订阅的topic
  76. * @return
  77. */
  78. private String getRedisTopic(){
  79. String hostAndPort = getHostAndPort();
  80. return REDIS_TOPIC + "_" + hostAndPort;
  81. }
  82. /**
  83. * redis发布消息
  84. * @param topic
  85. * @param message
  86. */
  87. private void pubMessage(String topic, Message message){
  88. redissonClient.getTopic(topic).publish(message);
  89. log.info("pubMessage topic {} data {}",topic,JSON.toJSONString(message));
  90. }
  91. /**
  92. * redis订阅消息
  93. * @param topic
  94. */
  95. public void subMessage(String topic){
  96. redissonClient.getTopic(topic).addListener(Message.class,(channel,message)->{
  97. log.info("topic {} submessage {}",topic, JSON.toJSONString(message));
  98. String userId = message.getTargetUserId();
  99. SocketIOClient socketIOClient = userIdToClient.get(userId);
  100. if(Objects.nonNull(socketIOClient)){
  101. socketIOClient.sendEvent(message.getKey(),JSON.toJSONString(message));
  102. }
  103. });
  104. }
  105. /**
  106. * 服务器 对应的组件加载完成后,需要开启redis订阅功能
  107. * @param args
  108. * @throws Exception
  109. */
  110. @Override
  111. public void run(String... args) throws Exception {
  112. String redisTopic = getRedisTopic();
  113. subMessage(redisTopic);
  114. log.info("topic {} 已订阅",redisTopic);
  115. }
  116. }
连接记录

建立连接和断开连接分别进行记录和清除工作

  1. @Autowired
  2. private ClientHub clientHub;
  3. /**
  4. * 客户端socket连接后触发
  5. * @param client
  6. */
  7. @OnConnect
  8. public void connect(SocketIOClient client){
  9. // 获取登录信息,也可以在这里面进行权限判断
  10. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  11. log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
  12. String url = client.getHandshakeData().getUrl();
  13. log.info("url :{}",url);
  14. // 保存userId和client的对应关系
  15. clientHub.addUser(userId,client);
  16. }
  17. /**
  18. * 客户端断开服务器时触发
  19. * @param client
  20. */
  21. @OnDisconnect
  22. public void disConnect(SocketIOClient client){
  23. // 获取登录信息
  24. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  25. log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
  26. // 解除userId和client的对应关系
  27. clientHub.deleteUser(userId);
  28. }
点对点发送消息业务使用
  1. /**
  2. * 点对点发送消息
  3. * @param client
  4. * @param data
  5. * 注意:如果接受的参数是一个对象,那么前端页面发送数据的时候也必须是一个对象,不能用json字符串代替
  6. * example:js代码 socket.emit('sendUserData', {data: "你好啊", targetUserId: "2233"});
  7. */
  8. @OnEvent(SEND_USER_DATA)
  9. public void sendUserData(SocketIOClient client, AckRequest ackRequest,Message data){
  10. // 获取登录信息
  11. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  12. log.info("sendUserData {}",JSON.toJSONString(data));
  13. clientHub.sendMessageByUserId(userId,data.getTargetUserId(),MESSAGE,data.getData());
  14. }

 这里只是简单的编写集群的思想,实际情况还有一些问题需要注意和解决,比如当某台机器挂了,这台机器连接的用户数据还是在redis上,其它机器就以为这些用户还是在线,但其实状态不对。这个就需要进行额外的处理了,比如存redis的时候设置过期时间,然后通过定时任务不断在redis上刷新在线用户信息等。

特殊场景集群方案

之前的redis方式,是通用的集群解决方案,基本上大部分的场景都适用,但有些时候业务场景比较特殊,也可以采取特殊的方式解决

特殊场景

用户之前的通信只在同一个房间内,比如视频会议,上课直播,游戏房间之类的,用户之间只和在同一个房间内的人进行通信,不需要对跨房间或者对房间外的用户进行实时消息推送。

思路

将房间看做一个单独的整体,只能分配在某一台服务器上,只要是这个房间的人需要连接服务器,就先获取该房间的服务器地址,客户端直接连上对应服务器,再进入房间,那么,该房间内的所有人都是在同一台服务器上,从而避免了跨服务器之间的通信场景,就相当于单机版。不过这个得手动控制房间的负载均衡,每台websocket服务器的访问路径也要自己控制好,生产的灰度发布也需要自行写代码控制。

spring事件模型

简述事件模型

spring提供的事件模型,其实也相当于一个发布订阅的工具,只不过是在单个程序内进行的,这里可以借助spring的事件,配合上多线程异步,可以更方便地进行websocket功能编码开发。从而达到,主要业务逻辑消息迅速实时推送,附加业务逻辑并发执行。

spring事件是spring自带的功能,不需要再额外引入依赖

定义事件

直接继承抽象类ApplicationEvent,然后自定义属性,自定义的属性用于在事件传播过程中记录自己需要的数据

  1. public class ConnectEvent extends ApplicationEvent {
  2. private SocketIOClient socketIOClient;
  3. public ConnectEvent(Object source,SocketIOClient socketIOClient) {
  4. super(source);
  5. this.socketIOClient = socketIOClient;
  6. }
  7. public SocketIOClient getSocketIOClient() {
  8. return socketIOClient;
  9. }
  10. }
事件发布

通过ApplicationEventPublisher的实例既可发布自定义的事件

  1. @Autowired
  2. private ApplicationEventPublisher applicationEventPublisher;
  3. @OnConnect
  4. public void connect(SocketIOClient client){
  5. // 获取登录信息,也可以在这里面进行权限判断
  6. String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
  7. log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
  8. String url = client.getHandshakeData().getUrl();
  9. log.info("url :{}",url);
  10. // 也可以在这一步进行权限鉴定,鉴定不通过就发给客户端一个权限鉴定失败的消息,然后再断开连接
  11. // client.sendEvent(MESSAGE,"权限不足");
  12. // client.disconnect();
  13. // 保存userId和client的对应关系
  14. clientHub.addUser(userId,client);
  15. // 发布一个用户连接事件
  16. applicationEventPublisher.publishEvent(new ConnectEvent(this,client));
  17. }
事件监听(订阅)

事件监听有两种书写方式,一种是直接实现ApplicationListener接口,泛型里面放入自定义事件

  1. @Component
  2. @Log4j2
  3. public class UserLoginListenerDemo implements ApplicationListener<ConnectEvent> {
  4. @Override
  5. public void onApplicationEvent(ConnectEvent event) {
  6. SocketIOClient socketIOClient = event.getSocketIOClient();
  7. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  8. {
  9. // 业务代码。。。
  10. log.info("UserLoginListenerDemo userId {}",userId);
  11. }
  12. }
  13. }

 第二中有点类似于controller的方式,在方法上用@EventListener注解来标识事件监听,在方法的参数中传入自定义的事件,并且如果有顺序需求的话,可以通过@Order来注明那个方法先调用(值小的先调用)

  1. @Component
  2. @Log4j2
  3. public class UserLoginListener {
  4. @EventListener
  5. @Order(3)
  6. public void connectListenerLog(ConnectEvent connectEvent){
  7. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  8. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  9. {
  10. log.info("user {} 登录的日志操作",userId);
  11. }
  12. }
  13. @EventListener
  14. @Order(2)
  15. public void connectListenerStore(ConnectEvent connectEvent){
  16. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  17. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  18. {
  19. log.info("user {} 登录的行为存表",userId);
  20. }
  21. }
  22. @EventListener
  23. @Order(1)
  24. public void connectListenerState(ConnectEvent connectEvent){
  25. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  26. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  27. {
  28. log.info("user {} 登录的状态更改",userId);
  29. }
  30. }
引入多线程异步处理

 上面的事件监听的方式是单线程的,如果仅仅是这样,感觉还不如直接定义方法进行调用,事件模型到显得有点多余。我们可以引入线程池和@Async,通过多线程并行处理事件,实现真正的发布订阅

创建一个spring提供的线程池

  1. @Configuration
  2. public class ThreadPoolConfig {
  3. @Bean
  4. public ThreadPoolTaskExecutor autoLoginExecutor() {
  5. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  6. threadPoolTaskExecutor.setCorePoolSize(20);
  7. threadPoolTaskExecutor.setMaxPoolSize(100);
  8. threadPoolTaskExecutor.setQueueCapacity(200);
  9. threadPoolTaskExecutor.setKeepAliveSeconds(60);
  10. threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  11. threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
  12. threadPoolTaskExecutor.setThreadNamePrefix("event-task-");
  13. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
  14. threadPoolTaskExecutor.initialize();
  15. return threadPoolTaskExecutor;
  16. }
  17. }

启动类上加上@EnableAsync注解

  1. @SpringBootApplication
  2. @Log4j2
  3. @EnableAsync
  4. public class ClusterEventApplication implements CommandLineRunner, DisposableBean {

 event监听方法上加上@Async表示异步执行

  1. @Component
  2. @Log4j2
  3. public class UserLoginListener {
  4. @EventListener
  5. // @Order(3)
  6. @Async
  7. public void connectListenerLog(ConnectEvent connectEvent){
  8. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  9. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  10. {
  11. log.info("user {} 登录的日志操作",userId);
  12. }
  13. }
  14. @EventListener
  15. // @Order(2)
  16. @Async
  17. public void connectListenerStore(ConnectEvent connectEvent){
  18. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  19. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  20. {
  21. log.info("user {} 登录的行为存表",userId);
  22. }
  23. }
  24. @EventListener
  25. // @Order(1)
  26. @Async
  27. public void connectListenerState(ConnectEvent connectEvent){
  28. SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
  29. String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
  30. {
  31. log.info("user {} 登录的状态更改",userId);
  32. }
  33. }
  34. }

效果截图:

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/973913
推荐阅读
相关标签
  

闽ICP备14008679号