赞
踩
websocket协议的出现,打破http那种只能由客户端发起请求,服务器响应的单项数据传输局面,采用websocket之后,可以由客户端推送数据到服务端,也可以由服务器主动推送数据到客户端,从而同时进行双向通信,便于数据实时更新推送
netty-socketio便是为了方便我们进行websocket开发的框架,底层采用netty,提供了可靠高效的性能
netty-socketio的官方github地址:https://github.com/mrniko/netty-socketio
咋们先来一个入门的websocket程序,带大家先简单了解下netty-socketio的使用
demo相关代码地址:https://gitee.com/xumengqq/work-websocketw.git
项目结构图
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>com.corundumstudio.socketio</groupId>
- <artifactId>netty-socketio</artifactId>
- <version>1.7.19</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.73</version>
- </dependency>
- # 自定义socket服务端配置
- websocket:
- server:
- host: localhost
- port: 9999
netty-socketio框架采用的数据传输格式是["key","data"],这是一个数组,前面的key是信令,可以自定义,后面是传递的数据,理解为对"key"信令发送了一条"data"数据。也可以类比成对某个接口发送了一条数据。后面的代码编写思路也是监听这个key信令上传的数据,或者主动对这个key信令下发数据
- @Data
- public class Message {
- private String userId;
- private String room;
- private String data;
- }
-
- public class EventConstant {
-
- public static final String TEST_EVENT = "test";
-
- public static final String MESSAGE = "message";
-
- public static final String USER_ID = "userId";
-
- public static final String JOIN_ROOM = "joinRoom";
- public static final String LEAVE_ROOM = "leaveRoom";
-
- /**
- * 获取该用户所有加入的房间
- */
- public static final String GET_MY_ROOMS = "getMyRooms";
-
- /**
- * 房间内广播消息
- * */
- public static final String ROOM_BROADCAST = "roomBroadcast";
- /**
- * 全局广播消息
- * */
- public static final String GLOBAL_BROADCAST = "globalBroadcast";
-
- public static final String SEND_USER_DATA = "sendUserData";
- }
- @Configuration
- @Log4j2
- public class WebsocketConfig {
-
- @Value("${websocket.server.host:localhost}")
- private String host;
- @Value("${websocket.server.port:9999}")
- private Integer websocketPort;
-
- /**
- * 设置socket service基本配置
- * @return
- */
- @Bean
- public SocketIOServer getSocketIOServer(){
- com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
- configuration.setPort(websocketPort);
- configuration.setHostname(host);
- configuration.getSocketConfig().setTcpNoDelay(true);
- configuration.getSocketConfig().setReuseAddress(true);
- return new SocketIOServer(configuration);
- }
-
- /**
- * 扫描socket相关的注解,例如@OnConnect、@DisConnect、@OnEvent
- * @return
- */
- @Bean
- public SpringAnnotationScanner springAnnotationScanner(){
- return new SpringAnnotationScanner(getSocketIOServer());
- }
- }
netty-socketio还提供了命名空间的概念,简单来说就是一个netty-socketio应用程序,里面有n个namespace,一个namespace里面又有n个room,用户默认会加入一个空字符""名称 的namespace和一个空字符""名称的room,下面有获取默认的namespace和room的代码和运行结果。绝大多数情况下使用默认的namespace就可以了,这里不做展开。
其它的配置可以参考github上的说明:https://github.com/mrniko/netty-socketio/wiki/Configuration-details
setHostname If not set then bind address will be 0.0.0.0 or ::0
setPort The port the socket.io server will listen to
setJsonTypeFieldName defaults to "@class"
setJsonSupport Allows to setup custom implementation of JSON serialization/deserialization. See JsonSupport
setAuthorizationListener Authorization listener invoked on every handshake. Accepts all clients by default.
setStoreFactory Client store and pubsub factory, used to store session data and implements distributed pubsub. Defaults to MemoryStoreFactory, but RedisStoreFactory and HazelcastStoreFactory also implemented.
setPreferDirectBuffer (added in 1.5 version) Buffer allocation method used during packet encoding. Defaults to true.
setBossThreads (added in 1.5 version) boss-threads amount for netty
setWorkerThreads (added in 1.5 version) worker-threads amount for netty
setHeartbeatInterval Heartbeat interval (in seconds), defaults to 25
setHeartbeatTimeout Heartbeat timeout (in seconds), defaults to 60. Use 0 to disable it
setCloseTimeout Channel close timeout (in seconds) due to inactivity, defaults to 60
setContext Namespace, defaults to "/socket.io"
setAllowCustomRequests Allow to service custom requests that differ from socket.io protocol, defaults to false.
setPollingDuration Polling interval for XHR transport (in seconds), defaults to 20
setKeyStorePassword SSL key store password (for secure connections)
setKeyStore SSL key store stream, maybe appointed to any source
setMaxHttpContentLength Set maximum HTTP content length limit, defaults to 64KB.
setTransports Transports supported by server, defaults to [Transport.WEBSOCKET, Transport.FLASHSOCKET, Transport.XHRPOLLING]. Cannot be empty list
- @SpringBootApplication
- @Log4j2
- public class SingleApplication implements CommandLineRunner, DisposableBean {
-
- @Autowired
- private SocketIOServer socketIOServer;
-
- public static void main(String[] args) {
- SpringApplication.run(SingleApplication.class, args);
- }
-
- /**
- * 启动socket服务器
- * @param args
- * @throws Exception
- */
- @Override
- public void run(String... args) throws Exception {
- log.info("socket start ...");
- socketIOServer.start();
- log.info("socket started");
- }
-
- /**
- * spring容器关闭时将socket服务也停止
- * @throws Exception
- */
- @Override
- public void destroy() throws Exception {
- socketIOServer.stop();
- log.info("socket server stop");
- }
- }
- @Component
- @Log4j2
- public class ConnectHandel {
-
- public static final String USER_ID = "userId";
-
- /**
- * 客户端socket连接后触发
- * @param client
- */
- @OnConnect
- public void connect(SocketIOClient client){
- // 获取登录信息,也可以在这里面进行权限判断
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
- }
-
- /**
- * 客户端断开服务器时触发
- * @param client
- */
- @OnDisconnect
- public void disConnect(SocketIOClient client){
- // 获取登录信息
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
- }
- }
- @Autowired
- private SocketIOServer socketIOServer;
-
- @OnEvent(MESSAGE)
- public void messageEvent(SocketIOClient client,String message) throws InterruptedException {
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("testEvent userId {}, sessionUuid {}, message {}",userId,client.getSessionId(),message);
-
- // 业务处理,这里写一个模拟demo
- // 向客户端发消息
- client.sendEvent(MESSAGE,"服务处理中...");
-
- Thread.sleep(1000);
- // 业务处理结束后再向客户端发消息
- client.sendEvent(MESSAGE,"服务处理完成!");
- }
- @OnEvent(JOIN_ROOM)
- public void joinRoom(SocketIOClient client,String room){
- client.joinRoom(room);
- }
- @OnEvent(LEAVE_ROOM)
- public void leaveRoom(SocketIOClient client,String room){
- client.leaveRoom(room);
- }
- @OnEvent(ROOM_BROADCAST)
- public void roomBroadcast(SocketIOClient client, String data){
- Message message = JSON.parseObject(data, Message.class);
- roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
- }
- public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
- if (Objects.isNull(excludeClient)){
- socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
- }else {
- socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
- }
- }
- @OnEvent(GLOBAL_BROADCAST)
- public void globalBroadcast(SocketIOClient client, String data){
- globalBroadcastData(MESSAGE,data,null);
- }
- public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
- if(Objects.isNull(excludeClient)){
- socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
- }else {
- socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
- }
- }
这个需要存储userId和client的对应关系,知道需要发送的目标userId,就能获取到client,然后直接发消息既可
我们在@OnConnect和@DisConnect中来绑定和解除userId和client的关系
客户端代码我是在网上找的,大家可按自己的方式实现既可
- <!DOCTYPE html>
- <html>
- <head>
- <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
- <title>SocketIO客户端测试环境</title>
- <base>
- <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
- <script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script>
- <style>
- body {
- padding: 20px;
- }
- #console {
- height: 450px;
- overflow: auto;
- }
- .connect-msg {
- color: green;
- }
- .disconnect-msg {
- color: red;
- }
- </style>
- </head>
-
- <body>
- <h1>客户端测试环境</h1>
- <hr style="height:1px;border:none;border-top:1px solid black;" />
-
- <div style="width: 700px; float: left">
- <h3>SocketClient建立连接</h3>
- <div style="border: 1px;">
-
- <label>socketio服务端地址:</label>
- <!--
- http://localhost 服务端ip
- 9999 服务端socket端口(服务端提供)
- test或socketIO 命名空间(可自定义)如果不定义命名空间,默认是/ 比如:http://localhost:9999?userId=12345
- userId 用户id参数(可自定义)
- ps:因为我定义了命名空间/test和/socketIO,所以我这里也可以用
- http://localhost:9999/test?userId=12345
- http://localhost:9999/socketIO?userId=12345
- 这里我用http://localhost:9999?userId=12345建立连接,因为这里还不涉及到请求不同命名空间的方法
- -->
- <input type="text" id="url" value="ws://localhost:9999?userId=12345" style="width: 500px;">
- <br>
- <br>
- <button id="connect" style="width: 100px;">建立连接</button>
- <button id="disconnect" style="width: 100px;">断开连接</button>
-
-
- </div>
-
- <hr style="height:1px;border:none;border-top:1px solid black;" />
-
- <h3>SocketClient发送消息</h3>
- <div style="border: 1px;">
- <label>socketEvent名称:</label><input type="text" id="socketEvent" value="getUserRooms">
- <br><br>
- <textarea id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容"></textarea>
- <button id="send" style="width: 100px;">发送消息</button>
- </div>
-
- <hr style="height:1px;border:none;border-top:1px solid black;" />
-
- </div>
- <div style="float: left;margin-left: 50px;">
- <h3>SocketIO互动消息</h3>
- <button id="clean" style="width: 100px;">清理输出</button>
- <div id="console" class="well"></div>
- </div>
-
-
- </body>
-
- <script type="text/javascript">
- var socket ;
- var errorCount = 0;
- var isConnected = false;
- var maxError = 5;
-
- //连接
- function connect(url) {
-
- //var opts = {
- // query: 'userId='+userId
- //};
- //socket = io.connect(url, opts);
-
- socket = io.connect(url);
-
- //socket.nsp = "/socketIO";//定义命名空间
-
- console.log(socket)
-
- //监听本次连接回调函数
- socket.on('connect', function () {
- isConnected =true;
- console.log("连接成功");
- serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>连接成功</span>');
- errorCount=0;
- });
- //监听消息
- socket.on('message', function (data) {
- output('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + data + ' </span>');
- console.log(data);
- });
-
- //监听断开
- socket.on('disconnect', function () {
- isConnected =false;
- console.log("连接断开");
- serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已下线! </span>');
- });
- //监听断开错误
- socket.on('connect_error', function(data){
- serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>;' + '连接错误-'+data+' </span>');
- errorCount++;
- if(errorCount>=maxError){
- socket.disconnect();
- }
- });
- //监听连接超时
- socket.on('connect_timeout', function(data){
- serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '连接超时-'+data+' </span>');
- errorCount++;
- if(errorCount>=maxError){
- socket.disconnect();
- }
- });
- //监听错误
- socket.on('error', function(data){
- serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '系统错误-'+data+' </span>');
- errorCount++;
- if(errorCount>=maxError){
- socket.disconnect();
- }
- });
-
- /*socket.on('ack', function(data){
- console.log("ack:"+data)
- var str = '消息发送失败';
- if(data==1){
- str = '消息发送成功';
- }
- serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + str+' </span>');
- });*/
- }
-
- function output(message) {
- var element = $("<div>" + " " + message + "</div>");
- $('#console').prepend(element);
- }
-
- function serverOutput(message) {
- var element = $("<div>" + message + "</div>");
- $('#console').prepend(element);
- }
-
- //连接
- $("#connect").click(function(){
- if(!isConnected){
- var url = $("#url").val();
- connect(url);
- }else {
- serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已经成功建立连接,不要重复建立!!! </span>');
- }
- })
-
-
- //断开连接
- $("#disconnect").click(function(){
- if(isConnected){
- socket.disconnect();
- }
- })
-
- //发送消息
- $("#send").click(function(){
- var socketEvent = $("#socketEvent").val();//自定义的事件名称
- var content = $("#content").val();//发送的内容
- socket.emit(socketEvent,content,function(data1,data2){
- console.log("ack1:"+data1);
- console.log("ack2:"+data2);
- });
- })
-
- //清理消息
- $("#clean").click(function(){
- $('#console').html("");
- })
-
- function getNowTime(){
- var date=new Date();
- var year=date.getFullYear(); //获取当前年份
- var mon=date.getMonth()+1; //获取当前月份
- var da=date.getDate(); //获取当前日
- var h=date.getHours(); //获取小时
- var m=date.getMinutes(); //获取分钟
- var s=date.getSeconds(); //获取秒
- var ms=date.getMilliseconds();
- var d=document.getElementById('Date');
- var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms;
- return date;
- }
- </script>
- </html>
-
- package com.corundumstudio.socketio.demo.client;
-
- import io.socket.client.IO;
- import io.socket.client.Socket;
- import io.socket.emitter.Emitter;
-
- import java.net.URISyntaxException;
-
-
- public class DemoSocketClient {
-
- public static void main(String[] args) throws URISyntaxException, InterruptedException {
- IO.Options options = new IO.Options();
- options.transports = new String[]{"websocket"};
- options.reconnectionAttempts = 2;
- options.reconnectionDelay = 1000;//失败重连的时间间隔
- options.timeout = 500;//连接超时时间(ms)
-
- final Socket socket = IO.socket("http://localhost:9999/?userId=12345", options);
-
- socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
- @Override
- public void call(Object... args) {
- socket.send("hello");
- }
- });
-
- socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
- @Override
- public void call(Object... args) {
- System.out.println("连接关闭");
- }
- });
-
- socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
- @Override
- public void call(Object... args) {
- System.out.println("sessionId:" + socket.id());
- for (Object obj : args) {
- System.out.println(obj);
- }
- System.out.println("收到服务器应答,将要断开连接...");
- socket.disconnect();
- }
- });
- socket.connect();
- }
- }
上面的demo是单机版本,但是,生产上服务都是集群存在的,至少也要两台,双节点防止单点故障;可是websocket服务搭建集群会有个问题,就是服务间通信的问题;比如,Aclient连接上Aserver,Bclient连接上了Bserver,这时候,Aclient就没法对Bclient进行发送消息和接受 消息。
这里提供几种解决方案
redis是官方推荐的解决方案,这个在Github上可以看到。配置之后,利用redis这个中介,可以通过广播的方式对其他客户端进行通信,底层是用了redis的发布订阅功能。有两种通信方式,可以通过全局广播通知到其他客户端,也可以对加入同一个房间内的客户端进行广播。但是这个有局限性,netty-socketio只提供了广播的redis发布订阅方案,点对点方式并没有提供。
- <!-- 添加redisson依赖-->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.15.4</version>
- </dependency>
- # redis
- spring:
- redis:
- host: 121.196.111.111 #地址
- port: 6379 #端口号
- database: 0 #0号库
- password: password #密码
- // redisson的配置类
- @Configuration
- public class RedissonConfig {
- @Value("${spring.redis.host}")
- private String host;
-
- @Value("${spring.redis.password}")
- private String password;
-
- @Value("${spring.redis.port}")
- private Integer port;
-
- @Value("${spring.redis.database:0}")
- private Integer database;
-
- @Value("${spring.redis.timeout:3000}")
- private Integer timeout;
-
- @Bean
- public RedissonClient redissonClient(){
- Config config = new Config();
- // config.useClusterServers().addNodeAddress("redis://").setPassword("");
- config.useSingleServer().setAddress("redis://"+ host +":" + port).setPassword(password).setDatabase(database).setTimeout(timeout);
- RedissonClient redissonClient = Redisson.create(config);
- return redissonClient;
- }
- }
-
-
- // 在SocketIOServer配置上redisson既可使用
- @Autowired
- private RedissonClient redissonClient;
- @Bean
- public SocketIOServer getSocketIOServer(){
- com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
- configuration.setPort(websocketPort);
- configuration.setHostname(host);
- configuration.getSocketConfig().setTcpNoDelay(true);
- configuration.getSocketConfig().setReuseAddress(true);
- // 设置redis存储方式
- configuration.setStoreFactory(new RedissonStoreFactory(redissonClient));
- return new SocketIOServer(configuration);
- }
- @OnEvent(ROOM_BROADCAST)
- public void roomBroadcast(SocketIOClient client, String data){
- Message message = JSON.parseObject(data, Message.class);
- roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
- }
-
- @OnEvent(GLOBAL_BROADCAST)
- public void globalBroadcast(SocketIOClient client, String data){
- globalBroadcastData(MESSAGE,data,null);
- }
-
- public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
- if(Objects.isNull(excludeClient)){
- socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
- }else {
- socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
- }
- }
-
- public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
- if (Objects.isNull(excludeClient)){
- socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
- }else {
- socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
- }
- }
nginx重新加载配置会影响原有的websocket连接吗?可不可以通过更新路由配置实现websocket服务的灰度发布?
经过测试,在更改nginx配置信息后,比如将其中一个服务设置成down,输入nginx -s reload命令,原有的websocket连接不会受到影响,但是新的连接不会再进入设置为down的服务,随时间推移,连接数就会越来越少,直至为0。因此,我们可以将需要发布的服务设置为down,然后再编写一个获取此服务连接client的数量的接口,当数量为0的时候,代表没有websocket连接了,此时,既可对这台服务器进行发布了。
获取某台服务器连接客户端数量
- @Autowired
- private SocketIOServer socketIOServer;
-
- @GetMapping("/getClientCount")
- public Object getClientCount(){
- Collection<SocketIOClient> allClients = socketIOServer.getAllClients();
- int size = allClients.size();
- return size;
- }
nginx配置websocket服务路由
这里需要采用hash的路由方式;不知道是框架还是其他的什么原因,客户端会发起三次的普通http请求和一次http升级成websocket的请求,采用其他的路由方式会导致这些请求不到同一个服务器从而导致websocket连接报错。这需要注意一下
- worker_processes 1;
-
- events {
- worker_connections 1024;
- }
-
- http {
- map $http_upgrade $connection_upgrade {
- default upgrade;
- '' close;
- }
-
- upstream websocket {
- hash $remote_addr consistent;
- server 127.0.0.1:9999 down;
- server 127.0.0.1:9998;
- }
-
- server {
- listen 80;
- location / {
- proxy_pass http://websocket;
- proxy_http_version 1.1;
- proxy_set_header Upgrade $http_upgrade;
- proxy_set_header Connection "$connection_upgrade";
- #proxy_set_header Host $host;
- }
- }
- }
之前的redis方案是netty-websocketio官方提供的集群解决方案,但此方案平没有解决点对点信息传输的问题,我们现在在此基础上还是通过redis的发布订阅来完善点对点信息推送。
当然,你也可以通过mq的方式来完成,个人觉得用redis的话,中小型项目就足够了,成本也低
思路:用户连上服务器,将用户和用户所连接的机器信息(这里指ip和端口号)保存在redis,断开连接则将redis上的记录清除掉;每台服务器订阅当前ip和端口号的topic,知道用户连接到那台服务器之后,发布消息则是发布到这个用户对应服务器ip和端口号的topic,这样,经过redis的订阅和发布,既可完成点对点的通信。(多台服务器分配多个topic方案是为了优化。如果采用一个topic,所有的机器都订阅同一个topic,然后每台机器再去赛选要发送的目标客户端是否连接在此机器上,这种方式会让其他非目标机器都进行一次赛选处理,略微有些影响性能,照成服务器不必要的开销)
在之前代码的基础上,编写一个clientHub类,用于存储用户连接之前的信息
- @Component
- @Log4j2
- public class ClientHub implements CommandLineRunner {
-
- @Autowired
- private SocketIOServer socketIOServer;
-
- @Autowired
- private RedissonClient redissonClient;
-
- public static final String REDIS_TOPIC = "redis_topic";
-
- public static final String ONLINE_USER = "online_user_";
-
- /**
- * 本机存储userId与客户端之间的连接关系
- */
- private static Map<String, SocketIOClient> userIdToClient = new HashMap<>();
-
- /**
- * 集群模式下,点对点发送,一律采取这个方法,不能直接通过SocketClient发送
- * @param fromUserId
- * @param targetUserId
- * @param key
- * @param data
- */
- public void sendMessageByUserId(String fromUserId,String targetUserId,String key,String data){
- Message message = new Message();
- message.setFromUserId(fromUserId);
- message.setTargetUserId(targetUserId);
- message.setKey(key);
- message.setData(data);
-
- SocketIOClient socketIOClient = userIdToClient.get(targetUserId);
- // 如果是本机直连的,直接发送,不是则通过redis发布对应的机器订阅
- if(!Objects.isNull(socketIOClient)){
- socketIOClient.sendEvent(message.getKey(),message);
- }else {
- RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + targetUserId);
- if(bucket.isExists()){
- String topic = bucket.get();
- this.pubMessage(topic,message);
- }
- }
- }
-
- /**
- * 用户连接时调用,存储用户连接之间的信息
- * @param userId
- * @param client
- */
- public void addUser(String userId,SocketIOClient client){
- userIdToClient.put(userId,client);
- // 存储在redis上
- RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
- bucket.set(getRedisTopic());
- }
-
- /**
- * 用户断开连接时调用,清楚用户和客户端之前的关系
- * @param userId
- */
- public void deleteUser(String userId){
- userIdToClient.remove(userId);
- // 从redis上删除掉
- RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
- bucket.delete();
- }
-
- /**
- * 获取本socket服务的ip和端口号
- * @return
- */
- private String getHostAndPort(){
- String hostAddress = "";
- try {
- hostAddress = InetAddress.getLocalHost().getHostAddress();
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- int port = socketIOServer.getConfiguration().getPort();
- return hostAddress+"_"+port;
- }
-
- /**
- * 获取这台机器redis订阅的topic
- * @return
- */
- private String getRedisTopic(){
- String hostAndPort = getHostAndPort();
- return REDIS_TOPIC + "_" + hostAndPort;
- }
-
- /**
- * redis发布消息
- * @param topic
- * @param message
- */
- private void pubMessage(String topic, Message message){
- redissonClient.getTopic(topic).publish(message);
- log.info("pubMessage topic {} data {}",topic,JSON.toJSONString(message));
- }
-
- /**
- * redis订阅消息
- * @param topic
- */
- public void subMessage(String topic){
- redissonClient.getTopic(topic).addListener(Message.class,(channel,message)->{
- log.info("topic {} submessage {}",topic, JSON.toJSONString(message));
- String userId = message.getTargetUserId();
- SocketIOClient socketIOClient = userIdToClient.get(userId);
- if(Objects.nonNull(socketIOClient)){
- socketIOClient.sendEvent(message.getKey(),JSON.toJSONString(message));
- }
- });
- }
-
- /**
- * 服务器 对应的组件加载完成后,需要开启redis订阅功能
- * @param args
- * @throws Exception
- */
- @Override
- public void run(String... args) throws Exception {
- String redisTopic = getRedisTopic();
- subMessage(redisTopic);
- log.info("topic {} 已订阅",redisTopic);
- }
- }
建立连接和断开连接分别进行记录和清除工作
- @Autowired
- private ClientHub clientHub;
-
- /**
- * 客户端socket连接后触发
- * @param client
- */
- @OnConnect
- public void connect(SocketIOClient client){
- // 获取登录信息,也可以在这里面进行权限判断
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
- String url = client.getHandshakeData().getUrl();
- log.info("url :{}",url);
-
- // 保存userId和client的对应关系
- clientHub.addUser(userId,client);
- }
-
- /**
- * 客户端断开服务器时触发
- * @param client
- */
- @OnDisconnect
- public void disConnect(SocketIOClient client){
- // 获取登录信息
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
-
- // 解除userId和client的对应关系
- clientHub.deleteUser(userId);
- }
- /**
- * 点对点发送消息
- * @param client
- * @param data
- * 注意:如果接受的参数是一个对象,那么前端页面发送数据的时候也必须是一个对象,不能用json字符串代替
- * example:js代码 socket.emit('sendUserData', {data: "你好啊", targetUserId: "2233"});
- */
- @OnEvent(SEND_USER_DATA)
- public void sendUserData(SocketIOClient client, AckRequest ackRequest,Message data){
- // 获取登录信息
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("sendUserData {}",JSON.toJSONString(data));
- clientHub.sendMessageByUserId(userId,data.getTargetUserId(),MESSAGE,data.getData());
- }
这里只是简单的编写集群的思想,实际情况还有一些问题需要注意和解决,比如当某台机器挂了,这台机器连接的用户数据还是在redis上,其它机器就以为这些用户还是在线,但其实状态不对。这个就需要进行额外的处理了,比如存redis的时候设置过期时间,然后通过定时任务不断在redis上刷新在线用户信息等。
之前的redis方式,是通用的集群解决方案,基本上大部分的场景都适用,但有些时候业务场景比较特殊,也可以采取特殊的方式解决
用户之前的通信只在同一个房间内,比如视频会议,上课直播,游戏房间之类的,用户之间只和在同一个房间内的人进行通信,不需要对跨房间或者对房间外的用户进行实时消息推送。
将房间看做一个单独的整体,只能分配在某一台服务器上,只要是这个房间的人需要连接服务器,就先获取该房间的服务器地址,客户端直接连上对应服务器,再进入房间,那么,该房间内的所有人都是在同一台服务器上,从而避免了跨服务器之间的通信场景,就相当于单机版。不过这个得手动控制房间的负载均衡,每台websocket服务器的访问路径也要自己控制好,生产的灰度发布也需要自行写代码控制。
spring提供的事件模型,其实也相当于一个发布订阅的工具,只不过是在单个程序内进行的,这里可以借助spring的事件,配合上多线程异步,可以更方便地进行websocket功能编码开发。从而达到,主要业务逻辑消息迅速实时推送,附加业务逻辑并发执行。
spring事件是spring自带的功能,不需要再额外引入依赖
直接继承抽象类ApplicationEvent,然后自定义属性,自定义的属性用于在事件传播过程中记录自己需要的数据
- public class ConnectEvent extends ApplicationEvent {
-
- private SocketIOClient socketIOClient;
-
- public ConnectEvent(Object source,SocketIOClient socketIOClient) {
- super(source);
- this.socketIOClient = socketIOClient;
- }
-
- public SocketIOClient getSocketIOClient() {
- return socketIOClient;
- }
- }
通过ApplicationEventPublisher的实例既可发布自定义的事件
- @Autowired
- private ApplicationEventPublisher applicationEventPublisher;
-
- @OnConnect
- public void connect(SocketIOClient client){
- // 获取登录信息,也可以在这里面进行权限判断
- String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
- log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
- String url = client.getHandshakeData().getUrl();
- log.info("url :{}",url);
- // 也可以在这一步进行权限鉴定,鉴定不通过就发给客户端一个权限鉴定失败的消息,然后再断开连接
- // client.sendEvent(MESSAGE,"权限不足");
- // client.disconnect();
-
- // 保存userId和client的对应关系
- clientHub.addUser(userId,client);
-
- // 发布一个用户连接事件
- applicationEventPublisher.publishEvent(new ConnectEvent(this,client));
- }
事件监听有两种书写方式,一种是直接实现ApplicationListener接口,泛型里面放入自定义事件
- @Component
- @Log4j2
- public class UserLoginListenerDemo implements ApplicationListener<ConnectEvent> {
- @Override
- public void onApplicationEvent(ConnectEvent event) {
- SocketIOClient socketIOClient = event.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- // 业务代码。。。
- log.info("UserLoginListenerDemo userId {}",userId);
- }
- }
- }
第二中有点类似于controller的方式,在方法上用@EventListener注解来标识事件监听,在方法的参数中传入自定义的事件,并且如果有顺序需求的话,可以通过@Order来注明那个方法先调用(值小的先调用)
- @Component
- @Log4j2
- public class UserLoginListener {
-
- @EventListener
- @Order(3)
- public void connectListenerLog(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的日志操作",userId);
- }
- }
-
- @EventListener
- @Order(2)
- public void connectListenerStore(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的行为存表",userId);
- }
- }
-
- @EventListener
- @Order(1)
- public void connectListenerState(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的状态更改",userId);
- }
- }
上面的事件监听的方式是单线程的,如果仅仅是这样,感觉还不如直接定义方法进行调用,事件模型到显得有点多余。我们可以引入线程池和@Async,通过多线程并行处理事件,实现真正的发布订阅
创建一个spring提供的线程池
- @Configuration
- public class ThreadPoolConfig {
-
- @Bean
- public ThreadPoolTaskExecutor autoLoginExecutor() {
- ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
- threadPoolTaskExecutor.setCorePoolSize(20);
- threadPoolTaskExecutor.setMaxPoolSize(100);
- threadPoolTaskExecutor.setQueueCapacity(200);
- threadPoolTaskExecutor.setKeepAliveSeconds(60);
- threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
- threadPoolTaskExecutor.setThreadNamePrefix("event-task-");
- threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
- threadPoolTaskExecutor.initialize();
- return threadPoolTaskExecutor;
- }
- }
启动类上加上@EnableAsync注解
- @SpringBootApplication
- @Log4j2
- @EnableAsync
- public class ClusterEventApplication implements CommandLineRunner, DisposableBean {
event监听方法上加上@Async表示异步执行
- @Component
- @Log4j2
- public class UserLoginListener {
-
- @EventListener
- // @Order(3)
- @Async
- public void connectListenerLog(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的日志操作",userId);
- }
- }
-
- @EventListener
- // @Order(2)
- @Async
- public void connectListenerStore(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的行为存表",userId);
- }
- }
-
- @EventListener
- // @Order(1)
- @Async
- public void connectListenerState(ConnectEvent connectEvent){
- SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
- String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
- {
- log.info("user {} 登录的状态更改",userId);
- }
- }
- }
效果截图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。