赞
踩
1.搭建一个Springboot项目
1.项目结构目录
2.导入jar包
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>5.0.0.Alpha2</version>
- </dependency>
3.yml 配置
- tcp:
- port: 8555
- boss:
- thread:
- count: 2
- worker:
- thread:
- count: 2
- so:
- keepalive: true
- backlog: 100
-
- server:
- port: 8888
4.创建TCP服务
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.net.InetSocketAddress;
-
- /**
- * @Author:hmz
- * @date:2019/07/10 15:34
- * @Explanation:
- */
- @Component
- public class TCPServer {
- @Autowired
- @Qualifier("serverBootstrap")
- private ServerBootstrap b;
-
- @Autowired
- @Qualifier("tcpSocketAddress")
- private InetSocketAddress tcpPort;
-
- private ChannelFuture serverChannelFuture;
-
- @PostConstruct
- public void start() throws Exception {
- System.out.println("Starting server at " + tcpPort);
- serverChannelFuture = b.bind(tcpPort).sync();
- }
-
- @PreDestroy
- public void stop() throws Exception {
- serverChannelFuture.channel().closeFuture().sync();
- }
-
- public ServerBootstrap getB() {
- return b;
- }
-
- public void setB(ServerBootstrap b) {
- this.b = b;
- }
-
- public InetSocketAddress getTcpPort() {
- return tcpPort;
- }
-
- public void setTcpPort(InetSocketAddress tcpPort) {
- this.tcpPort = tcpPort;
- }
- }

5.初始化通道
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Component;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
- /**
- * @Author:hmz
- * @date:2019/07/10 15:30
- * @Explanation:
- */
- @Component
- @Qualifier("springProtocolInitializer")
- public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- StringDecoder stringDecoder;
-
- @Autowired
- StringEncoder stringEncoder;
-
- @Autowired
- NettyHandle nettyHandle;
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", stringDecoder); // 解码器
- pipeline.addLast("handler", nettyHandle); // 处理器
- pipeline.addLast("encoder", stringEncoder); // 编码器
- // 如果后期处理拆包粘包可以在这里处理
- /** LineBasedFrameDecoder:以行为单位进行数据包的解码,使用换行符\n或者\r\n作为依据遇
- 到\n或者\r\n都认为是一条完整的消息。
- DelimiterBasedFrameDecoder:以特殊的符号作为分隔来进行数据包的解码。
- FixedLengthFrameDecoder:以固定长度进行数据包的解码。
- LenghtFieldBasedFrameDecode:适用于消息头包含消息长度的协议(最常用)。
- */
- pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) ;
- }
-
- public StringDecoder getStringDecoder() {
- return stringDecoder;
- }
-
- public void setStringDecoder(StringDecoder stringDecoder) {
- this.stringDecoder = stringDecoder;
- }
-
- public StringEncoder getStringEncoder() {
- return stringEncoder;
- }
-
- public void setStringEncoder(StringEncoder stringEncoder) {
- this.stringEncoder = stringEncoder;
- }
-
- public NettyHandle getNettyHandle() {
- return nettyHandle;
- }
-
- public void setNettyHandle(NettyHandle nettyHandle) {
- this.nettyHandle = nettyHandle;
- }
- }

6.Netty配置
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
-
- import java.net.InetSocketAddress;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
-
- /**
- * @Author:hmz
- * @date:2019/07/10 15:26
- * @Explanation:
- */
- @Configuration
- public class NettyConfigTest {
-
- //链接线程数
- // 一般设置为1,因为它只负责接收客户端的连接,不需要太多的线程来处理;
- @Value("${boss.thread.count}")
- private int bossCount;
-
- //工作线程数
- // worker线程数:根据服务器的性能来设置,一般设置为CPU核数的2-4倍;
- @Value("${worker.thread.count}")
- private int workerCount;
-
- @Value("${tcp.port}")
- private int tcpPort;
-
- @Value("${so.keepalive}")
- private boolean keepAlive;
-
- @Value("${so.backlog}")
- private int backlog;
-
- @Autowired
- private NettyHandle nettyHandle;
-
- //bootstrap配置
- @SuppressWarnings("unchecked")
- @Bean(name = "serverBootstrap")
- public ServerBootstrap bootstrap() {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup(), workerGroup())
- .channel(NioServerSocketChannel.class)
- .childHandler(nettyHandle);
- Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
- Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
- for (@SuppressWarnings("rawtypes")
- ChannelOption option : keySet) {
- b.option(option, tcpChannelOptions.get(option));
- }
- return b;
- }
-
- @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
- public NioEventLoopGroup bossGroup() {
- return new NioEventLoopGroup(bossCount);
- }
-
- @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
- public NioEventLoopGroup workerGroup() {
- return new NioEventLoopGroup(workerCount);
- }
-
- @Bean(name = "tcpSocketAddress")
- public InetSocketAddress tcpPort() {
- return new InetSocketAddress(tcpPort);
- }
-
- @Bean(name = "tcpChannelOptions")
- public Map<ChannelOption<?>, Object> tcpChannelOptions() {
- Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
- options.put(ChannelOption.SO_KEEPALIVE, keepAlive); // 是否使用TCP的心跳机制
- // 1. 服务器TCP内核 内维护了两个队列,称为A(未连接队列)和B(已连接队列)
- // 2. 如果A+B的长度大于Backlog时,新的连接就会被TCP内核拒绝掉。
- // 3. 默认是50
- options.put(ChannelOption.SO_BACKLOG, backlog);
- // 还有其他的配置
- // SO_SNDBUF 、SO_RCVBUF 、SO_BROADCAST 、 SO_REUSEADDR、SO_LINGER 等
- // 见后面
- return options;
- }
-
- @Bean(name = "stringEncoder")
- public StringEncoder stringEncoder() {
- return new StringEncoder();
- }
-
- @Bean(name = "stringDecoder")
- public StringDecoder stringDecoder() {
- return new StringDecoder();
- }
-
- /**
- * Necessary to make the Value annotations work.
- *
- * @return
- */
- @Bean
- public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
- return new PropertySourcesPlaceholderConfigurer();
- }
- }

7.NettyHandle事件处理
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Component;
-
- import java.io.UnsupportedEncodingException;
-
- /**
- * @Author:hmz
- * @date:2019/07/10 15:26
- * @Explanation:
- */
- @Component
- @Qualifier("serverHandler")
- @ChannelHandler.Sharable
- public class NettyHandle extends SimpleChannelInboundHandler<String> {
- private static final Logger log = LoggerFactory.getLogger(NettyHandle.class);
-
- @Override
- protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
- log.info("client msg:"+msg);
- String clientIdToLong= ctx.channel().id().asLongText();
- log.info("client long id:"+clientIdToLong);
- String clientIdToShort= ctx.channel().id().asShortText();
- log.info("client short id:"+clientIdToShort);
- if(msg.indexOf("bye")!=-1){
- //close
- ctx.channel().close();
- }else{
- //send to client
- ctx.channel().writeAndFlush("Yoru msg is:"+msg);
-
- }
-
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
- log.info("server 读取数据……");
- //读取数据
-
- byte[] req = readClientData((ByteBuf) msg);
- String body = new String(req, "GBK"); //获取到的值
- log.info("客户端的数据------>"+body);
-
- sendInfo(ctx , "收到");
-
-
- }
-
- private void sendInfo(ChannelHandlerContext ctx , String info) {
- ByteBuf bufff = Unpooled.buffer();
- bufff.writeBytes(info.getBytes());
- ctx.writeAndFlush(bufff);
- ctx.flush();
- }
-
- private byte[] readClientData(ByteBuf msg) {
- // logger.info("读客户端的数据.");
- ByteBuf buf = msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- buf.release();
- return req;
- }
-
-
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("开始连接");
- sendInfo(ctx , "连接成功");
-
- super.channelActive(ctx);
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error("异常关闭");
- sendInfo(ctx , "异常");
- ctx.close();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- log.info("离线");
- sendInfo(ctx , "离线");
- super.channelInactive(ctx);
- }
-
-
-
-
-
- }

1.NClient
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- import java.net.InetSocketAddress;
-
- /**
- * @Author:hmz
- * @date:2019/07/08 14:01
- * @Explanation:
- */
- public class NClient {
-
- private String host;
- private int port;
-
- public NClient(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public void start() throws Exception {
- EventLoopGroup nioEventLoopGroup = null;
- try {
- //创建Bootstrap对象用来引导启动客户端
- Bootstrap bootstrap = new Bootstrap();
- //创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
- nioEventLoopGroup = new NioEventLoopGroup();
- //创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址
- bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
- .handler(new ChannelInitializer<SocketChannel>() {
- //添加一个ChannelHandler,客户端成功连接服务器后就会被执行
- @Override
- protected void initChannel(SocketChannel ch)
- throws Exception {
- ch.pipeline().addLast(new NClientHandler());
- }
- });
- // • 调用Bootstrap.connect()来连接服务器
- ChannelFuture f = bootstrap.connect().sync();
- // • 最后关闭EventLoopGroup来释放资源
- f.channel().closeFuture().sync();
- } finally {
- nioEventLoopGroup.shutdownGracefully().sync();
- }
- }
-
-
- }

2.NClientHandler
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * @Author:hmz
- * @date:2019/07/08 14:07
- * @Explanation:
- */
- public class NClientHandler extends ChannelInboundHandlerAdapter {
-
- public static List<ChannelHandlerContext> cts = new ArrayList<ChannelHandlerContext>();
-
-
- /**
- * 向服务端发送数据
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
- System.out.println("客户端与服务端通道-开启:" + ctx.channel().localAddress() + "channelActive");
- cts.add(ctx);
- String sendInfo = "你好服务端";
- System.out.println("客户端准备发送的数据包:" + sendInfo);
- write(ctx,sendInfo);
- }
-
- public void write(ChannelHandlerContext ctx , String mess) throws Exception {
- String sendInfo = mess;
- ctx.writeAndFlush(Unpooled.copiedBuffer(sendInfo, CharsetUtil.UTF_8)); // 必须有flush
- ctx.flush();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- //读取数据
-
- //读取数据
- ByteBuf buf1 = (ByteBuf) msg;
- byte[] req = readClientData((ByteBuf) msg);
- String body = new String(req, "UTF-8"); //获取到的值
- System.out.println("客户端的数据------>"+body);
- //写数据
- write(ctx,"wits写的数据");
-
- }
-
- //将netty的数据装换为字节数组
- private byte[] readClientData(ByteBuf msg) {
- ByteBuf buf = msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- buf.release();
- return req;
- }
-
- /**
- * channelInactive
- *
- * channel 通道 Inactive 不活跃的
- *
- * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
- *
- */
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("客户端与服务端通道-关闭:" + ctx.channel().localAddress() + "channelInactive");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cts.remove(ctx);
- ctx.close();
- System.out.println("异常退出:" + cause.getMessage());
- }
-
- }

3.mainTest
- import com.herbert.client.NClient;
- import com.herbert.finalPool.ConstantPool;
-
- /**
- * @Author:hmz
- * @date:2019/07/08 14:01
- * @Explanation:
- */
- public class TestMain {
-
- public static void main(String[] args) throws Exception {
- new NClient(ConstantPool.HOST, ConstantPool.PORT).start(); // 连接127.0.0.1/65535,并启动
-
-
- }
-
- }

参数 | 解释 |
---|---|
SO_BROADCAST | 对应套接字层的套接字:SO_BROADCAST,将消息发送到广播地址。 如果目标中指定的接口支持广播数据包,则启用此选项可让应用程序发送广播消息。 |
SO_KEEPALIVE | 对应套接字层的套接字:SO_KEEPALIVE,保持连接。 在空闲套接字上发送探测,以验证套接字是否仍处于活动状态。 |
SO_SNDBUF | 对应套接字层的套接字:SO_SNDBUF,设置发送缓冲区的大小。 |
SO_RCVBUF | 对应套接字层的套接字:SO_RCVBUF,获取接收缓冲区的大小。 |
SO_REUSEADDR | 对应套接字层的套接字:SO_REUSEADDR,本地地址复用。 启用此选项允许绑定已使用的本地地址。 |
SO_LINGER | 对应套接字层的套接字:SO_LINGER,延迟关闭连接。 启用此选项,在调用close时如果存在未发送的数据时,在close期间将阻止调用应用程序,直到数据被传输或连接超时。 |
SO_BACKLOG | 对应TCP/IP协议中<font color=red>backlog</font>参数,<font color=red>backlog</font>即连接队列,设置TCP中的连接队列大小。如果队列满了,会发送一个ECONNREFUSED错误信息给C端,即“ Connection refused”。 |
SO_TIMEOUT | 等待客户连接的超时时间。 |
IP_TOS | 对应套接字层的套接字:IP_TOS,在IP标头中设置服务类型(TOS)和优先级。 |
IP_MULTICAST_ADDR | 对应IP层的套接字选项:IP_MULTICAST_IF,设置应发送多播数据报的传出接口。 |
IP_MULTICAST_IF | 对应IP层的套接字选项:IP_MULTICAST_IF2,设置应发送多播数据报的IPV6传出接口。 |
IP_MULTICAST_TTL | 对应IP层的套接字选项:IP_MULTICAST_TTL,在传出的 多播数据报的IP头中设置生存时间(TTL)。 |
IP_MULTICAST_LOOP_DISABLED | 取消 指定应将 传出的多播数据报的副本 回传到发送主机,只要它是多播组的成员即可。 |
TCP_NODELAY | 对应TCP层的套接字选项:TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 决定何时发送数据。Nagle算法代表通过减少必须发送包的个数来增加网络软件系统的效率。即尽可能发送大块数据避免网络中充斥着大量的小数据块。如果要追求高实时性,需要设置关闭Nagle算法;如果需要追求减少网络交互次数,则设置开启Nagle算法。 |
参数 | 解释 |
---|---|
ALLOCATOR | ByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT。 |
RCVBUF_ALLOCATOR | 用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。 |
MESSAGE_SIZE_ESTIMATOR | 消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。 |
CONNECT_TIMEOUT_MILLIS | 连接超时毫秒数,默认值30000毫秒即30秒。 |
WRITE_SPIN_COUNT | 一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。 |
WRITE_BUFFER_WATER_MARK | |
ALLOW_HALF_CLOSURE | 一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。 |
AUTO_READ | 自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。 |
AUTO_CLOSE |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。