赞
踩
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高;Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度
这是个开源的框架。通过它,我们可以像spring-boot-starter-websocket一样使用注解进行开发,只需关注需要的事件(如OnMessage)。并且底层是使用Netty,netty-websocket-spring-boot-starter其他配置和spring-boot-starter-websocket完全一样,当需要调参的时候只需要修改配置参数即可,无需过多的关心handler的设置
要求 JDK 1.8
<dependency>
<groupId>org.yeauty</groupId>
<artifactId>netty-websocket-spring-boot-starter</artifactId>
<version>0.8.0</version>
</dependency>
此处为官方文档给的实例
@ServerEndpoint @Component public class MyWebSocket { @OnOpen public void onOpen(Session session, HttpHeaders headers, ParameterMap parameterMap) throws IOException { System.out.println("new connection"); String paramValue = parameterMap.getParameter("paramKey"); System.out.println(paramValue); } @OnClose public void onClose(Session session) throws IOException { System.out.println("one connection closed"); } @OnError public void onError(Session session, Throwable throwable) { throwable.printStackTrace(); } @OnMessage public void onMessage(Session session, String message) { System.out.println(message); session.sendText("Hello Netty!"); } @OnBinary public void onBinary(Session session, byte[] bytes) { for (byte b : bytes) { System.out.println(b); } session.sendBinary(bytes); } @OnEvent public void onEvent(Session session, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent.state()) { case READER_IDLE: System.out.println("read idle"); break; case WRITER_IDLE: System.out.println("write idle"); break; case ALL_IDLE: System.out.println("all idle"); break; default: break; } } } }
此处为本人项目部分源码
package org.springblade.websocket; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.yeauty.annotation.OnBinary; import org.yeauty.annotation.OnClose; import org.yeauty.annotation.OnError; import org.yeauty.annotation.OnEvent; import org.yeauty.annotation.OnMessage; import org.yeauty.annotation.OnOpen; import org.yeauty.annotation.ServerEndpoint; import org.yeauty.pojo.ParameterMap; import org.yeauty.pojo.Session; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.timeout.IdleStateEvent; @ServerEndpoint(prefix = "netty-websocket") @Component public class MyWebSocket { private static final Logger logger = LoggerFactory.getLogger(MyWebSocket.class); //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; //定时器 private Timer timer; //心跳检验 private volatile boolean isPong; //线程 private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public MyWebSocket() { this.timer = new Timer(); this.isPong = true; } /** * 当有新的WebSocket连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders、ParameterMap * @param session * @param headers * @param parameterMap * @throws IOException */ @OnOpen public void onOpen(Session session, HttpHeaders headers, ParameterMap parameterMap) throws IOException { String paramValue = parameterMap.getParameter("paramKey"); //在线数加1 addOnlineCount(); //心跳连接 timer.schedule(new TimerTask() { @Override public void run() { try { if (isPong) { //服务没有断开 String message = "{\"type\":1,\"data\":\"yes\"}"; session.sendText(message); isPong = false; } else { onClose(session); this.cancel(); } } catch (IOException e) { e.printStackTrace(); this.cancel(); } } }, 0, 10 * 1000); logger.info("有新连接加入!当前在线人数为" + getOnlineCount()); } /** * 当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session * @param session * @throws IOException */ @OnClose public void onClose(Session session) throws IOException { System.out.println("one connection closed"); //在线数减1 if(onlineCount>0) { subOnlineCount(); } session.close(); logger.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable * @param session * @param throwable */ @OnError public void onError(Session session, Throwable throwable) { throwable.printStackTrace(); session.close(); } /** * 当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String * @param session * @param message {"type":13, "parameter":{"appId":3}} */ @OnMessage public void onMessage(Session session, String message) { logger.info("来自客户端" + session.channel().id() + "的消息:" + message +" 时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); //接收消息 JSONObject jsonObject = JSON.parseObject(message); int type = jsonObject.getInteger("type"); //服务端主动向服务端推送信息 if(getOnlineCount()==0) { return; } //类型为1 设为心跳检测 if(type == 1) { this.isPong=true; }else { if(session.isOpen()) { executor.submit(new Runnable() { @Override public void run() { JSONObject parameter=jsonObject.getJSONObject("parameter"); session.sendText("Hello Netty!" +parameter.getString("appId")); } }); logger.info("---》websocket发送消息成功!!!ID--》" + session.channel().id()); } } } /** * 当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[] * @param session * @param bytes */ @OnBinary public void onBinary(Session session, byte[] bytes) { for (byte b : bytes) { System.out.println(b); } session.sendBinary(bytes); } /** * 当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object * @param session * @param evt */ @OnEvent public void onEvent(Session session, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent.state()) { case READER_IDLE: System.out.println("read idle"); break; case WRITER_IDLE: System.out.println("write idle"); break; case ALL_IDLE: System.out.println("all idle"); break; default: break; } } } private static synchronized int getOnlineCount() { return onlineCount; } private static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; } private static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }
打开WebSocket客户端,连接到ws://127.0.0.1:80
此处IP地址为默认的,在实际的开发业务中需要改变,所以可以根据官方文档给的实例进行配置
@ServerEndpoint(prefix = "netty-websocket")
@Component
public class MyWebSocket {
---------
}
配置文件配置application.properties/application.yml
netty-websocket.host=0.0.0.0
netty-websocket.path=/
netty-websocket.port=80
其他的还有很多配置项 ,可以按照实际业务需求自己加上就行
效果如图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。