赞
踩
这次整合借鉴了以下博主的智慧
springboot学习(四十三) springboot使用netty-socketio实现消息推送
socketio的核心依赖就只有这个
<!-- netty-socketio: 仿`node.js`实现的socket.io服务端 -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.19</version>
</dependency>
#自定义socketio配置,你可以直接硬编码,看个人喜好 socketio: # socketio请求地址 host: 127.0.0.1 # socketio端口 port: 9999 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器 maxFramePayloadLength: 1048576 # 设置http交互最大内容长度 maxHttpContentLength: 1048576 # socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount: 1 # 连接数大小 workCount: 100 # 允许客户请求 allowCustomRequests: true # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 upgradeTimeout: 1000000 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingTimeout: 6000000 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval: 25000 # 命名空间,多个以逗号分隔, namespaces: /test,/socketIO #namespaces: /socketIO
package com.gzgs.socketio.common.config; import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Optional; @Configuration public class SocketIOConfig { @Value("${socketio.host}") private String host; @Value("${socketio.port}") private Integer port; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Value("${socketio.namespaces}") private String[] namespaces; @Bean public SocketIOServer socketIOServer() { SocketConfig socketConfig = new SocketConfig(); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setSocketConfig(socketConfig); config.setHostname(host); config.setPort(port); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); //服务端 final SocketIOServer server = new SocketIOServer(config); //添加命名空间(如果你不需要命名空间,下面的代码可以去掉) Optional.ofNullable(namespaces).ifPresent(nss -> Arrays.stream(nss).forEach(server::addNamespace)); return server; } //这个对象是用来扫描socketio的注解,比如 @OnConnect、@OnEvent @Bean public SpringAnnotationScanner springAnnotationScanner() { return new SpringAnnotationScanner(socketIOServer()); } }
我在启动类里面定义了启动或者关闭SocketIOServer
package com.gzgs.socketio; import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; @SpringBootApplication public class SocketioServerApplication { public static void main(String[] args) { SpringApplication.run(SocketioServerApplication.class, args); } } @Component @Slf4j class SocketIOServerRunner implements CommandLineRunner, DisposableBean { @Autowired private SocketIOServer socketIOServer; @Override public void run(String... args) throws Exception { socketIOServer.start(); log.info("SocketIOServer==============================启动成功"); } @Override public void destroy() throws Exception { //如果用kill -9 这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉 socketIOServer.stop(); log.info("SocketIOServer==============================关闭成功"); } }
springboot整合socketIO的工作已经完成了
参考下即可,核心是如何配置以及如何启动/关闭SocketIO
package com.gzgs.socketio.common.cache; import com.corundumstudio.socketio.SocketIOClient; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * 这是存储用户的缓存信息 */ @Component public class ClientCache { //用于存储用户的socket缓存信息 private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>(); //保存用户信息 public void saveClient(String userId,UUID sessionId,SocketIOClient socketIOClient){ HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId); if(sessionIdClientCache == null){ sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId,socketIOClient); concurrentHashMap.put(userId,sessionIdClientCache); } //获取用户信息 public HashMap<UUID,SocketIOClient> getUserClient(String userId){ return concurrentHashMap.get(userId); } //根据用户id和session删除用户某个session信息 public void deleteSessionClientByUserId(String userId,UUID sessionId){ concurrentHashMap.get(userId).remove(sessionId); } //删除用户缓存信息 public void deleteUserCacheByUserId(String userId){ concurrentHashMap.remove(userId); } }
用于监听客户端的建立连接请求和关闭连接请求
package com.gzgs.socketio.common.handler; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.OnConnect; import com.corundumstudio.socketio.annotation.OnDisconnect; import com.gzgs.socketio.common.cache.ClientCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Slf4j @Component public class SocketIOServerHandler { @Autowired private ClientCache clientCache; /** * 建立连接 * @param client 客户端的SocketIO */ @OnConnect public void onConnect(SocketIOClient client) { //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999?userId=12345 //下面两种是加了命名空间的,他会请求对应命名空间的方法(就类似你进了不同的房间玩游戏) //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/test?userId=12345 //因为我定义用户的参数为userId,你也可以定义其他名称 客户端请求 http://localhost:9999/SocketIO?userId=12345 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //同一个页面sessionid一样的 UUID sessionId = client.getSessionId(); //保存用户的信息在缓存里面 clientCache.saveClient(userId,sessionId,client); log.info("SocketIOServerHandler-用户id:{},sessionId:{},建立连接成功",userId,sessionId); } /** * 关闭连接 * @param client 客户端的SocketIO */ @OnDisconnect public void onDisconnect(SocketIOClient client){ //因为我定义用户的参数为userId,你也可以定义其他名称 String userId = client.getHandshakeData().getSingleUrlParam("userId"); //sessionId,页面唯一标识 UUID sessionId = client.getSessionId(); //clientCache.deleteUserCacheByUserId(userId); //只会删除用户某个页面会话的缓存,不会删除该用户不同会话的缓存,比如:用户同时打开了谷歌和QQ浏览器,当你关闭谷歌时候,只会删除该用户谷歌的缓存会话 clientCache.deleteSessionClientByUserId(userId,sessionId); log.info("SocketIOServerHandler-用户id:{},sessionId:{},关闭连接成功",userId,sessionId); } }
直接复制建立html文件,在浏览器打开就可以使用了
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>SocketIO客户端测试环境</title> <base> <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script> <script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script> <style> body { padding: 20px; } #console { height: 450px; overflow: auto; } .connect-msg { color: green; } .disconnect-msg { color: red; } </style> </head> <body> <h1>客户端测试环境</h1> <hr style="height:1px;border:none;border-top:1px solid black;" /> <div style="width: 700px; float: left"> <h3>SocketClient建立连接</h3> <div style="border: 1px;"> <label>socketio服务端地址:</label> <!-- http://localhost 服务端ip 9999 服务端socket端口(服务端提供) test或socketIO 命名空间(可自定义)如果不定义命名空间,默认是/ 比如:http://localhost:9999?userId=12345 userId 用户id参数(可自定义) ps:因为我定义了命名空间/test和/socketIO,所以我这里也可以用 http://localhost:9999/test?userId=12345 http://localhost:9999/socketIO?userId=12345 这里我用http://localhost:9999?userId=12345建立连接,因为这里还不涉及到请求不同命名空间的方法 --> <input type="text" id="url" value="http://localhost:9999?userId=12345" style="width: 500px;"> <br> <br> <button id="connect" style="width: 100px;">建立连接</button> <button id="disconnect" style="width: 100px;">断开连接</button> </div> <hr style="height:1px;border:none;border-top:1px solid black;" /> <h3>SocketClient发送消息</h3> <div style="border: 1px;"> <label>socketEvent名称:</label><input type="text" id="socketEvent" value="getUserRooms"> <br><br> <textarea id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容"></textarea> <button id="send" style="width: 100px;">发送消息</button> </div> <hr style="height:1px;border:none;border-top:1px solid black;" /> </div> <div style="float: left;margin-left: 50px;"> <h3>SocketIO互动消息</h3> <button id="clean" style="width: 100px;">清理输出</button> <div id="console" class="well"></div> </div> </body> <script type="text/javascript"> var socket ; var errorCount = 0; var isConnected = false; var maxError = 5; //连接 function connect(url) { //var opts = { // query: 'userId='+userId //}; //socket = io.connect(url, opts); socket = io.connect(url); //socket.nsp = "/socketIO";//定义命名空间 console.log(socket) //监听本次连接回调函数 socket.on('connect', function () { isConnected =true; console.log("连接成功"); serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>连接成功</span>'); errorCount=0; }); //监听消息 socket.on('message', function (data) { output('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + data + ' </span>'); console.log(data); }); //监听断开 socket.on('disconnect', function () { isConnected =false; console.log("连接断开"); serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已下线! </span>'); }); //监听断开错误 socket.on('connect_error', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>;' + '连接错误-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); //监听连接超时 socket.on('connect_timeout', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '连接超时-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); //监听错误 socket.on('error', function(data){ serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '系统错误-'+data+' </span>'); errorCount++; if(errorCount>=maxError){ socket.disconnect(); } }); /*socket.on('ack', function(data){ console.log("ack:"+data) var str = '消息发送失败'; if(data==1){ str = '消息发送成功'; } serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + str+' </span>'); });*/ } function output(message) { var element = $("<div>" + " " + message + "</div>"); $('#console').prepend(element); } function serverOutput(message) { var element = $("<div>" + message + "</div>"); $('#console').prepend(element); } //连接 $("#connect").click(function(){ if(!isConnected){ var url = $("#url").val(); connect(url); }else { serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已经成功建立连接,不要重复建立!!! </span>'); } }) //断开连接 $("#disconnect").click(function(){ if(isConnected){ socket.disconnect(); } }) //发送消息 $("#send").click(function(){ var socketEvent = $("#socketEvent").val();//自定义的事件名称 var content = $("#content").val();//发送的内容 socket.emit(socketEvent,content,function(data1,data2){ console.log("ack1:"+data1); console.log("ack2:"+data2); }); }) //清理消息 $("#clean").click(function(){ $('#console').html(""); }) function getNowTime(){ var date=new Date(); var year=date.getFullYear(); //获取当前年份 var mon=date.getMonth()+1; //获取当前月份 var da=date.getDate(); //获取当前日 var h=date.getHours(); //获取小时 var m=date.getMinutes(); //获取分钟 var s=date.getSeconds(); //获取秒 var ms=date.getMilliseconds(); var d=document.getElementById('Date'); var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms; return date; } </script> </html>
html效果如下:
自己点击建立连接和断开连接按钮测试玩下
ps:http://localhost:9999?userId=12345是没有命名空间的请求
SocketIO、namespace(命名空间)、room(房间)的关系如下:
SocketIO广播是以namespace或者room为维度的,具体如下:
如果不定义namespace,默认是/
如果定义了namespace,没有定义room,房间默认的名字和namespace一样。
你也可以这样定义
server.addNamespace(“/test”);
server.addNamespace(“/socketIO”);
package com.gzgs.socketio.common.handler; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class TestHandler { //测试使用 @OnEvent("testHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("MyTestHandler:{}",data); if(ackRequest.isAckRequested()){ //返回给客户端,说我接收到了 ackRequest.sendAckData("MyTestHandler",data); } } }
package com.gzgs.socketio.common.handler; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class SocketIOHandler { //测试使用 @OnEvent("socketIOHandler") public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { log.info("SocketIOHandler:{}",data); if(ackRequest.isAckRequested()){ //返回给客户端,说我接收到了 ackRequest.sendAckData("SocketIOHandler",data); } } }
在启动类的SocketIO监听里面加入监听
package com.gzgs.socketio; import com.corundumstudio.socketio.SocketIOServer; import com.gzgs.socketio.common.handler.SocketIOHandler; import com.gzgs.socketio.common.handler.TestHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; @SpringBootApplication public class SocketioServerApplication { public static void main(String[] args) { SpringApplication.run(SocketioServerApplication.class, args); } } @Component @Slf4j class SocketIOServerRunner implements CommandLineRunner, DisposableBean { @Autowired private SocketIOServer socketIOServer; @Autowired private TestHandler testHandler; @Autowired private SocketIOHandler socketIOHandler; @Override public void run(String... args) throws Exception { //namespace分别交给各自的Handler监听,这样就可以隔离,只有客户端指定namespace,才能访问对应Handler。 //比如:http://localhost:9999/test?userId=12345 socketIOServer.getNamespace("/test").addListeners(testHandler); socketIOServer.getNamespace("/socketIO").addListeners(socketIOHandler); socketIOServer.start(); log.info("SocketIOServer==============================启动成功"); } @Override public void destroy() throws Exception { socketIOServer.stop(); log.info("SocketIOServer==============================关闭成功"); } }
其他的一些测试我写在下面的代码上,自己去测试才能更好的理解
//加入房间
@OnEvent("joinRoom")
public void joinRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
client.joinRoom(data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("加入房间","成功");
}
}
//离开房间
@OnEvent("leaveRoom")
public void leaveRoom(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
client.leaveRoom(data);
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("离开房间","成功");
}
}
//获取该用户所有房间 @OnEvent("getUserRooms") public void getUserRooms(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room:allRooms){ System.out.println("房间名称:"+room); } log.info("服务器收到消息,客户端用户id:{} | 客户发送的消息:{} | 是否需要返回给客户端内容:{} ",userId,data,ackRequest.isAckRequested()); if(ackRequest.isAckRequested()){ //返回给客户端,说我接收到了 ackRequest.sendAckData("你好","哈哈哈"); } }
@OnEvent("sendRoomMessage") public void sendRoomMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException { String userId = client.getHandshakeData().getSingleUrlParam("userId"); Set<String> allRooms = client.getAllRooms(); for (String room:allRooms){ log.info("房间:{}",room); //发送给指定空间名称以及房间的人,并且排除不发给自己 socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message",client, data); //发送给指定空间名称以及房间的人,包括自己 //socketIoServer.getNamespace("/socketIO").getRoomOperations(room).sendEvent("message", data);; } if(ackRequest.isAckRequested()){ //返回给客户端,说我接收到了 ackRequest.sendAckData("发送消息到指定的房间","成功"); } }
//广播消息给指定的Namespace下所有客户端
@OnEvent("sendNamespaceMessage")
public void sendNamespaceMessage(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {
socketIoServer.getNamespace("/socketIO").getBroadcastOperations().sendEvent("message",client, data);;
if(ackRequest.isAckRequested()){
//返回给客户端,说我接收到了
ackRequest.sendAckData("发送消息到指定的房间","成功");
}
}
//点对点
public void sendMessageOne(String userId) throws JsonProcessingException {
HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
for (UUID sessionId : userClient.keySet()) {
socketIoServer.getNamespace("/socketIO").getClient(sessionId).sendEvent("message", "这是点对点发送");
}
}
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。