当前位置:   article > 正文

Netty大战之Netty模型及核心组件_netty网络编程模型

netty网络编程模型

 工作原理示意图1-简单版

Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多 线程模型有多个 Reactor

 说明

 工作原理示意图2-进阶版

1、Boos Groop和Worker Group内部都包含多个NioEventLoop(又叫循环事件)。

2、每个NIOEventLoop对应一个Selector,

工作原理示意图-详细版

 说明:

1、netty抽象出两个线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写。

2、BossGroup和WorkerGroup类型都是NioEventLoopGroup(事件循环组)。

3、NioEventLoopGroup相当于一个事件循环组,这个组中有多个事件循环,每一事件循环是NioEventLoop.

4、NioEventLoop表示的是一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定其上的socket的网络通讯。

5、NioEventLoopGroup可以有多个线程,并且可以设定个数,多少个线程对应的就是一个NioEventLoop

6、每个BossNioEventLoop循环的步骤有3步

1)轮训accept事件

2)处理accept事件,与client建立连接,生成SocketChannel在分装成NioSocketChannel,将其注册到某一个worker的NioEventLoop上的Selector上

3)处理任务队列的任务,及runAllTasks

7、每个worker NioEventLoop循环执行的步骤

1、轮训read,write事件

2、处理i/o事件,既read,write事件,在对应的NIoSockertChannel处理

3、处理任务队列的任务,既runAllTasks

8、每个worker NioEventLoop,处理业务时,会使用Pipeline(管道),pipeline中包含了Channel,即通过pipeline可以获取到对应的通道,管道中维护了很多的处理器。

友情提示:从源码中可以知道从Channel中可以获取Pipeline或者从Pipeline中获取到Channel,Channel主要作用是进行读写,Pipeline主要的作用是进行数据的业务处理,pipeline本质上是一个双向链表,包含一个出栈,入栈的问题

上下文对象包含

 channel

 pipeline

Netty 核心模块组

友情提示:感觉先介绍核心组件,在学习代码比较容易理解,Netty的代码都是这些组件结合实现代码的编写。

1、EventLoopGroup 和其实现类 NioEventLoopGroup

1.1、EventLoop

2、Bootstrap、ServerBootstrap

3、Channel

4、ChannelOption

5、Selector

6、ChannelHandler 及其实现类

7、ChannelHandlerContext

8、Pipeline 和 ChannelPipeline

9、Future、ChannelFuture、Promise

10、Unpooled

一开始需要树立正确的观念

  • 把 channel 理解为数据的通道,可以想象成走水路呢,空陆呢,还是路陆呢
  • 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf,可以想象成交通工具,飞机,轮船,火车
  • 把 handler 理解为数据的处理工序,可以想象成,到达目的的每一小步,到每一个站点使用什么方式去一个站点,比如我想去北京,我做火车到南京,到了南京做轮船,去上海,到了上海我做飞机去北京。这每一个站点就是一个Handler.
    • 工序有多道,合在一起就是 pipeline,可以想象成,这完整的到达目的的线程流程就是Pipeline。pipeline 负责发布事件(读、读取完成...)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
    • handler 分 Inbound 和 Outbound 两类,可以想象成去北京,然后从北京回来。
  • 把 eventLoop 理解为处理数据的工人(真正干活的,可以想象成,就是自己
    • 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人

EventLoopGroup 和其实现类 NioEventLoopGroup

友情提示:

NioEventLoopGroup可以执行 io事件,普通任务,定时任务

DefaultEventLoopGroup只能执行普通任务,定时任务

友情提示:

    EventLoopGroup 内部包含多个EventLoop,每个EventLoop关联一个Selector,多个Channel注册到Selector,BoosGroup的Selector主要监听Channel的连接,WorkerGroup的Selector主要监听Channel的读写操作。当BoosGroup的Selector监听到Channel的连接后,就会将其SockertChannel分装成功NioSocketChannel,WorkerGroup会调用next接口,调用其中的一个EventLoop将其注册到该EventLoop的Selector进行监听。

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

 常用方法

 代码:

  1. package cn.itcast.netty.c3;
  2. import io.netty.channel.DefaultEventLoopGroup;
  3. import io.netty.channel.EventLoopGroup;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.util.NettyRuntime;
  6. import lombok.extern.slf4j.Slf4j;
  7. import java.util.concurrent.TimeUnit;
  8. @Slf4j
  9. public class TestEventLoop {
  10. public static void main(String[] args) {
  11. // 1. 创建事件循环组
  12. EventLoopGroup group = new NioEventLoopGroup(2); // io事件,普通任务,定时任务
  13. // EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
  14. // 2. 获取下一个事件循环对象
  15. System.out.println(group.next());
  16. System.out.println(group.next());
  17. System.out.println(group.next());
  18. System.out.println(group.next());
  19. // 3. 执行普通任务
  20. // execute也可以换成submit,group.next().submit
  21. /*group.next().execute(() -> {
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. log.debug("ok");
  28. });*/
  29. // 4. 执行定时任务,以一定的频率执行
  30. group.next().scheduleAtFixedRate(() -> {
  31. log.debug("ok");
  32. // 第一个参数初始的延迟时间,如果为0就是立刻执行
  33. // 第二个参数,就是每隔多长时间执行。
  34. }, 0, 1, TimeUnit.SECONDS);
  35. log.debug("main");
  36. }
  37. }

演示 NioEventLoop 处理 io 事件

服务器端两个 nio worker 工人

  1. package cn.itcast.netty.c3;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import lombok.extern.slf4j.Slf4j;
  9. import java.nio.charset.Charset;
  10. @Slf4j
  11. public class EventLoopServer {
  12. public static void main(String[] args) {
  13. // 细分2:创建一个独立的 EventLoopGroup
  14. EventLoopGroup group = new DefaultEventLoopGroup();
  15. new ServerBootstrap()
  16. // boss 和 worker
  17. // 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写
  18. .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
  19. .channel(NioServerSocketChannel.class)
  20. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  21. @Override
  22. protected void initChannel(NioSocketChannel ch) throws Exception {
  23. ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
  24. @Override // ByteBuf
  25. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  26. ByteBuf buf = (ByteBuf) msg;
  27. log.debug(buf.toString(Charset.defaultCharset()));
  28. ctx.fireChannelRead(msg); // 让消息传递给下一个handler
  29. }
  30. });
  31. /*.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
  32. @Override // ByteBuf
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  34. ByteBuf buf = (ByteBuf) msg;
  35. log.debug(buf.toString(Charset.defaultCharset()));
  36. }
  37. });*/
  38. }
  39. })
  40. .bind(8080);
  41. }
  42. }

结论:

可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定

再增加两个非 nio 工人

  1. package cn.itcast.netty.c3;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import lombok.extern.slf4j.Slf4j;
  9. import java.nio.charset.Charset;
  10. @Slf4j
  11. public class EventLoopServer {
  12. public static void main(String[] args) {
  13. // 细分2:创建一个独立的 EventLoopGroup
  14. EventLoopGroup group = new DefaultEventLoopGroup(2);
  15. new ServerBootstrap()
  16. // boss 和 worker
  17. // 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写
  18. .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
  19. .channel(NioServerSocketChannel.class)
  20. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  21. @Override
  22. protected void initChannel(NioSocketChannel ch) throws Exception {
  23. ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
  24. @Override // ByteBuf
  25. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  26. ByteBuf buf = (ByteBuf) msg;
  27. log.debug(buf.toString(Charset.defaultCharset()));
  28. ctx.fireChannelRead(msg); // 让消息传递给下一个handler
  29. }
  30. })
  31. .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
  32. @Override // ByteBuf
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  34. ByteBuf buf = (ByteBuf) msg;
  35. log.debug(buf.toString(Charset.defaultCharset()));
  36. }
  37. });
  38. }
  39. })
  40. .bind(8080);
  41. }
  42. }

 结论:可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)

 handler 执行中如何换人?

友情提示;针对上面的添加的非Nio工人,是怎么实现handler换人的。

从源码: io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

源码:

  1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  2. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  3. EventExecutor executor = next.executor();
  4. if (executor.inEventLoop()) {
  5. next.invokeChannelRead(m);
  6. } else {
  7. executor.execute(new Runnable() {
  8. public void run() {
  9. next.invokeChannelRead(m);
  10. }
  11. });
  12. }
  13. }

 说明:

  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

EventLoop

可以理解为线程池+Selector,可以关联多个Channel.

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

演示 NioEventLoop 处理普通任务和处理定时任务

友情提示:NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务

  1. package cn.itcast.netty.c3;
  2. import io.netty.channel.DefaultEventLoopGroup;
  3. import io.netty.channel.EventLoopGroup;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.util.NettyRuntime;
  6. import lombok.extern.slf4j.Slf4j;
  7. import java.util.concurrent.TimeUnit;
  8. @Slf4j
  9. public class TestEventLoop {
  10. public static void main(String[] args) {
  11. // 1. 创建事件循环组
  12. EventLoopGroup group = new NioEventLoopGroup(2); // io事件,普通任务,定时任务
  13. // EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
  14. // 2. 获取下一个事件循环对象
  15. System.out.println(group.next());
  16. System.out.println(group.next());
  17. System.out.println(group.next());
  18. System.out.println(group.next());
  19. // 3. 执行普通任务
  20. // execute也可以换成submit,group.next().submit
  21. /*group.next().execute(() -> {
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. log.debug("ok");
  28. });*/
  29. // 4. 执行定时任务,以一定的频率执行
  30. group.next().scheduleAtFixedRate(() -> {
  31. log.debug("ok");
  32. // 第一个参数初始的延迟时间,如果为0就是立刻执行
  33. // 第二个参数,就是每隔多长时间执行。
  34. }, 0, 1, TimeUnit.SECONDS);
  35. log.debug("main");
  36. }
  37. }

Bootstrap、ServerBootstrap

友情提示:ServerBootstrap用于服务端,Bootstrap用于客户端。这两个主要用来配置Netty的衔接各个组件。也叫启动引导类。

常用方法:group,channel,option,childOption,childHandler,bind,connect,这里有客户端的方法,也有服务端的方法

 Channel

友情提示:

1、通过Channel,选择不同的Channel可以选择不同的协议

 NioSocketChannel,异步的客户端 TCP Socket 连接。

• NioServerSocketChannel,异步的服务器端 TCP Socket 连接。

• NioDatagramChannel,异步的 UDP 连接。

• NioSctpChannel,异步的客户端 Sctp 连接。

• NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO

2、通过Channel可以获取网络连接的配置参数,可以获取连接的通道状态,可以获取pipeline,可以获取RomteAddress,和LocalAddress。

3、Channel提供网络IO操作(事件操作),但是不能保证结果的是否成功。通过ChannelFuture实例,通过监听器,来监测事件操作是否成功

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

 ChannelOption

友情提示:记住

1、ChannelOption.SO_BACKLOG:主要用于设置连接队列的大小的,如果处理不了,就可以将其存入到队列中,可以通过他决定队列的大小。

2、ChannelOption.SO_KEEPALIVE:设置保持连接获得状态

 Selector

这个就不介绍了,前面的NIO已经很详细了

  ChannelHandlerContext

友情提示:又名叫上下文对象,可以获取到Channel和Pipeline,同时Channel和Pipeline之间是相互获取,debug中看到他们之间是不断的套娃。

1、从下面图可以看出,ChannelHandlerContext内部对应关联着一个ChannelHandler,实际干事的是ChannelHandlerContext,Pipeline是一个双向链表,每一个节点就是ChannelHandlerContext,包括头和尾。

 ChannelHandler 及其实现类

1、ChannelHandler是一个接口,有Netty自己实现的Handler,主要的还是程序员自己编写的Handler,当Handler处理完操作后,就会将其转发到Pipeline中的下一个Handler进行处理,有点像SpringSecurity的filterChain一样。

2、一般继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,下面的ChannelInboundHandlerAdapter,表示的是入栈,ChannelOutboundHandlerAdapter表示的是出栈

3、继承Handler之后,就要实现对应的内部的方法,主要有连接操作,读取操作,异常操作等

ChannelPipeline

友情提示:主要保存ChannelHandler的结合,他是一个双向链表。

 常用方法

Future、ChannelFuture、Promise

友情提示:Channel在事件操作的时候,不能很好的获取事件的操作的成功与否,可以通过ChannelFuture,利用Future和监听器,实现事件操作的成功与否的反馈。不然不知道有没有成功。

友情提示:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象,这也就导致服务端接收不到数据。解决办法使用同步的方式或者使用回调函数。

  Future & Promise

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名(netty的Future继承jdk中的Future),但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果(异步体现在回调函数channelFuture.addListener),但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

 jdkFuture代码:

  1. package cn.itcast.netty.c3;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.concurrent.*;
  4. @Slf4j
  5. public class TestJdkFuture {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. // 1. 线程池
  8. ExecutorService service = Executors.newFixedThreadPool(2);
  9. // 2. 提交任务
  10. Future<Integer> future = service.submit(new Callable<Integer>() {
  11. @Override
  12. public Integer call() throws Exception {
  13. log.debug("执行计算");
  14. Thread.sleep(1000);
  15. return 50;
  16. }
  17. });
  18. // 3. 主线程通过 future 来获取结果
  19. log.debug("等待结果");
  20. log.debug("结果是 {}", future.get());
  21. }
  22. }

 NettyFuture代码:

  1. package cn.itcast.netty.c3;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelFutureListener;
  4. import io.netty.channel.EventLoop;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.util.concurrent.Future;
  7. import io.netty.util.concurrent.GenericFutureListener;
  8. import lombok.extern.slf4j.Slf4j;
  9. import java.util.concurrent.Callable;
  10. import java.util.concurrent.ExecutionException;
  11. @Slf4j
  12. public class TestNettyFuture {
  13. public static void main(String[] args) throws ExecutionException, InterruptedException {
  14. NioEventLoopGroup group = new NioEventLoopGroup();
  15. EventLoop eventLoop = group.next();
  16. Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
  17. @Override
  18. public Integer call() throws Exception {
  19. log.debug("执行计算");
  20. Thread.sleep(1000);
  21. return 70;
  22. }
  23. });
  24. // log.debug("等待结果");
  25. // log.debug("结果是 {}", future.get());
  26. future.addListener(new GenericFutureListener<Future<? super Integer>>(){
  27. @Override
  28. public void operationComplete(Future<? super Integer> future) throws Exception {
  29. log.debug("接收结果:{}", future.getNow());
  30. }
  31. });
  32. }
  33. }

 Promise代码:

  1. package cn.itcast.netty.c3;
  2. import io.netty.channel.EventLoop;
  3. import io.netty.channel.EventLoopGroup;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.util.concurrent.DefaultPromise;
  6. import lombok.extern.slf4j.Slf4j;
  7. import java.util.concurrent.ExecutionException;
  8. @Slf4j
  9. public class TestNettyPromise {
  10. public static void main(String[] args) throws ExecutionException, InterruptedException {
  11. // 1. 准备 EventLoop 对象
  12. EventLoop eventLoop = new NioEventLoopGroup().next();
  13. // 2. 可以主动创建 promise, 结果容器
  14. DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
  15. new Thread(() -> {
  16. // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
  17. log.debug("开始计算...");
  18. try {
  19. int i = 1 / 0;
  20. Thread.sleep(1000);
  21. promise.setSuccess(80);
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. promise.setFailure(e);
  25. }
  26. }).start();
  27. // 4. 接收结果的线程
  28. log.debug("等待结果...");
  29. log.debug("结果是: {}", promise.get());
  30. }
  31. }

 Unpooled

友情提示:Netty的数据容器,用于数据读取和写入。

ByteBuf buffer = Unpooled.buffer(10);

常用方法:

1、writeByte():写入数据

2、capacity():获取定义的总长度

3、readByte():读取字节,这个可以让其readerindex属性值变化。

4、getByte(0),获取指定位置的值,这个不能让其readerindex属性值变化。

5、hasArray():判断是否还有元素

6、array():变成字符数组

7、arrayOffset():获取偏移量

8、readerIndex():获取读取的位置

9、writerIndex():获取写的位置

10、getCharSequence(0, 4, Charset.forName("utf-8")):截取指定的长度,有点类似String的截取字符串一样.

  1. //2. 在netty 的buffer中,不需要使用flip 进行反转
  2. // 底层维护了 readerindex 和 writerIndex
  3. //3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
  4. // 0---readerindex 已经读取的区域
  5. // readerindex---writerIndex , 可读的区域
  6. // writerIndex -- capacity, 可写的区域

 代码1:

  1. package com.atguigu.netty.buf;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. public class NettyByteBuf01 {
  5. public static void main(String[] args) {
  6. //创建一个ByteBuf
  7. //说明
  8. //1. 创建 对象,该对象包含一个数组arr , 是一个byte[10]
  9. //2. 在netty 的buffer中,不需要使用flip 进行反转
  10. // 底层维护了 readerindex 和 writerIndex
  11. //3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
  12. // 0---readerindex 已经读取的区域
  13. // readerindex---writerIndex , 可读的区域
  14. // writerIndex -- capacity, 可写的区域
  15. ByteBuf buffer = Unpooled.buffer(10);
  16. for(int i = 0; i < 10; i++) {
  17. buffer.writeByte(i);
  18. }
  19. System.out.println("capacity=" + buffer.capacity());//10
  20. //输出
  21. // for(int i = 0; i<buffer.capacity(); i++) {
  22. // System.out.println(buffer.getByte(i));
  23. // }
  24. for(int i = 0; i < buffer.capacity(); i++) {
  25. System.out.println(buffer.readByte());
  26. }
  27. System.out.println("执行完毕");
  28. }
  29. }

代码2:

  1. package com.atguigu.netty.buf;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import java.nio.charset.Charset;
  5. public class NettyByteBuf02 {
  6. public static void main(String[] args) {
  7. //创建ByteBuf
  8. ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
  9. //使用相关的方法
  10. if(byteBuf.hasArray()) { // true
  11. byte[] content = byteBuf.array();
  12. //将 content 转成字符串
  13. System.out.println(new String(content, Charset.forName("utf-8")));
  14. System.out.println("byteBuf=" + byteBuf);
  15. System.out.println(byteBuf.arrayOffset()); // 0 数组的偏移量
  16. System.out.println(byteBuf.readerIndex()); // 0
  17. System.out.println(byteBuf.writerIndex()); // 12
  18. System.out.println(byteBuf.capacity()); // 36
  19. //System.out.println(byteBuf.readByte()); //读取一个元素,readerIndex移动一个位置,导致,byteBuf.readableBytes()长度减少一个
  20. System.out.println(byteBuf.getByte(0)); // 104
  21. int len = byteBuf.readableBytes(); //可读的字节数 12
  22. System.out.println("len=" + len);
  23. //使用for取出各个字节
  24. for(int i = 0; i < len; i++) {
  25. System.out.println((char) byteBuf.getByte(i));
  26. }
  27. //按照某个范围读取
  28. System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
  29. System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
  30. }
  31. }
  32. }

Hello World

客户端:

  1. package cn.itcast.netty.c2;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.nio.NioSocketChannel;
  6. import io.netty.handler.codec.string.StringEncoder;
  7. import java.net.InetSocketAddress;
  8. public class HelloClient {
  9. public static void main(String[] args) throws InterruptedException {
  10. // 1. 启动类
  11. new Bootstrap()
  12. // 2. 添加 EventLoop
  13. .group(new NioEventLoopGroup())
  14. // 3. 选择客户端 channel 实现
  15. .channel(NioSocketChannel.class)
  16. // 4. 添加处理器
  17. .handler(new ChannelInitializer<NioSocketChannel>() {
  18. @Override // 在连接建立后被调用
  19. protected void initChannel(NioSocketChannel ch) throws Exception {
  20. ch.pipeline().addLast(new StringEncoder());
  21. }
  22. })
  23. // 5. 连接到服务器
  24. .connect(new InetSocketAddress("localhost", 8080))
  25. .sync()
  26. .channel()
  27. // 6. 向服务器发送数据
  28. .writeAndFlush("hello, world");
  29. }
  30. }

服务端:

  1. package cn.itcast.netty.c2;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. import io.netty.handler.logging.LoggingHandler;
  11. public class HelloServer {
  12. public static void main(String[] args) {
  13. // 1. 启动器,负责组装 netty 组件,启动服务器
  14. new ServerBootstrap()
  15. // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
  16. .group(new NioEventLoopGroup())
  17. // 3. 选择 服务器的 ServerSocketChannel 实现
  18. .channel(NioServerSocketChannel.class) // OIO BIO
  19. // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
  20. .childHandler(
  21. // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
  22. new ChannelInitializer<NioSocketChannel>() {
  23. @Override
  24. protected void initChannel(NioSocketChannel ch) throws Exception {
  25. // 6. 添加具体 handler
  26. ch.pipeline().addLast(new LoggingHandler());
  27. ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
  28. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
  29. @Override // 读事件
  30. public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
  31. System.out.println(msg); // 打印上一步转换好的字符串
  32. }
  33. });
  34. }
  35. })
  36. // 7. 绑定监听端口
  37. .bind(8080);
  38. }
  39. }

 代码解读

服务器端:

  • 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开

  • 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有

  • 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次)它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

  • 4 处,ServerSocketChannel 绑定的监听端口

  • 5 处,SocketChannel 的处理器,解码 ByteBuf => String

  • 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

客户端

  • 1 处,创建 NioEventLoopGroup,同 Server

  • 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有

  • 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  • 4 处,指定要连接的服务器和端口
  • 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕,sync 方法的作用就是让其变成同步的。
  • 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
  • 7 处,写入消息并清空缓冲区
  • 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
  • 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

 代码执行流程

Netty快速入门实例-TCP服务

实例要求:

1) Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 "hello, 服务器~"

2) 服务器可以回复消息给客户端 "hello, 客户端~"

3) 目的:对Netty 线程模型 有一个初步认识, 便于理解Netty 模型理

服务端:

  1. package yu.learn.simpleNetty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. public class ServerNetty {
  11. public static void main(String[] args) {
  12. //
  13. EventLoopGroup BossGroup=new NioEventLoopGroup();
  14. EventLoopGroup WorkerGroup=new NioEventLoopGroup();
  15. ServerBootstrap serverBootstrap=new ServerBootstrap();
  16. try{
  17. serverBootstrap.group(BossGroup,WorkerGroup)
  18. .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
  19. .option(ChannelOption.SO_BACKLOG,124)// 设置线程队列得到连接个数
  20. .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
  21. // 该 handler对应 bossGroup , childHandler 对应 workerGroup
  22. .childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. ch.pipeline().addLast(new ServerHandler());
  26. }
  27. });// 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
  28. System.out.println("服务端连接成功");
  29. //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
  30. //启动服务器(并绑定端口)
  31. ChannelFuture future= serverBootstrap.bind(6666).sync();
  32. //对关闭通道进行监听,这个并不是马上关闭
  33. future.channel().closeFuture().sync();
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }finally{
  37. BossGroup.shutdownGracefully();
  38. WorkerGroup.shutdownGracefully();
  39. }
  40. }
  41. }

服务端自定义处理器:

  1. package yu.learn.simpleNetty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. /*
  8. 说明
  9. 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
  10. 2. 这时我们自定义一个Handler , 才能称为一个handler
  11. 这是一个出栈
  12. */
  13. public class ServerHandler extends ChannelInboundHandlerAdapter {
  14. //读取数据实际(这里我们可以读取客户端发送的消息)
  15. /*
  16. 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
  17. 2. Object msg: 就是客户端发送的数据 默认Object
  18. */
  19. @Override
  20. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  21. System.out.println("从客户端接收来的消息");
  22. //将 msg 转成一个 ByteBuf
  23. //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
  24. ByteBuf byteBuf = (ByteBuf) msg;
  25. System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  26. ctx.channel()
  27. .eventLoop()
  28. .execute(
  29. new Runnable() {
  30. @Override
  31. public void run() {
  32. try{
  33. Thread.sleep(1000);
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }
  37. System.out.println("服务器任务队列");
  38. }
  39. });
  40. }
  41. //数据读取完毕
  42. @Override
  43. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  44. //writeAndFlush 是 write + flush
  45. //将数据写入到缓存,并刷新
  46. //一般讲,我们对这个发送的数据进行编码
  47. System.out.println("服务端发送的消息");
  48. ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
  49. }
  50. // 专门处理异常的
  51. @Override
  52. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  53. ctx.fireExceptionCaught(cause);
  54. }
  55. }

客户端:

  1. package yu.learn.simpleNetty;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. public class ClientNetty {
  10. public static void main(String[] args) {
  11. //
  12. EventLoopGroup group=new NioEventLoopGroup();
  13. try{
  14. Bootstrap bootstrap=new Bootstrap();
  15. bootstrap.group(group)
  16. .channel(NioSocketChannel.class)
  17. .handler(new ChannelInitializer<SocketChannel>() {
  18. @Override
  19. protected void initChannel(SocketChannel ch) throws Exception {
  20. ch.pipeline().addLast(new ClientHandle());
  21. }
  22. });
  23. System.out.println("客户端连接成功");
  24. ChannelFuture future= bootstrap.connect("127.0.0.1",6666).sync();
  25. future.channel().closeFuture().sync();
  26. }catch (Exception e){
  27. e.printStackTrace();
  28. }finally{
  29. group.shutdownGracefully();
  30. }
  31. }
  32. }

客户端自定义处理器:

  1. package yu.learn.simpleNetty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. public class ClientHandle extends ChannelInboundHandlerAdapter {
  8. @Override
  9. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  10. System.out.println("client " + ctx);
  11. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
  12. }
  13. @Override
  14. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  15. System.out.println("从服务端获取的消息");
  16. ByteBuf byteBuf = (ByteBuf) msg;
  17. System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  18. }
  19. @Override
  20. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  21. throws Exception {
  22. ctx.fireExceptionCaught(cause);
  23. }
  24. }

任务队列中的 Task的3种使用场景

友情提示:每个NioEventLoop里都有一个selector与TaskQueue,当我们在进行一些耗时的操作的时候,会产生阻塞,这时候我们就可以用到TaskQueue,这个是内部的操作,要注意和下面的异步模型区分开

1) 用户程序自定义的普通任务 [举例说明]

2) 用户自定义定时任务

3) 非当前 Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后 调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到 任务队列中后被异步消费

1、 用户程序自定义的普通任务

任务放在taskQueue队列中

主要修改了ServerHandler的代码

  1. package yu.learn.simpleNetty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. /*
  8. 说明
  9. 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
  10. 2. 这时我们自定义一个Handler , 才能称为一个handler
  11. 这是一个出栈
  12. */
  13. public class ServerHandler extends ChannelInboundHandlerAdapter {
  14. //读取数据实际(这里我们可以读取客户端发送的消息)
  15. /*
  16. 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
  17. 2. Object msg: 就是客户端发送的数据 默认Object
  18. */
  19. @Override
  20. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  21. System.out.println("从客户端接收来的消息");
  22. //将 msg 转成一个 ByteBuf
  23. //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
  24. ByteBuf byteBuf = (ByteBuf) msg;
  25. System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  26. ctx.channel()
  27. .eventLoop()
  28. .execute(
  29. new Runnable() {
  30. @Override
  31. public void run() {
  32. try{
  33. Thread.sleep(1000);
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }
  37. System.out.println("服务器任务队列1");
  38. }
  39. });
  40. ctx.channel()
  41. .eventLoop()
  42. .execute(
  43. new Runnable() {
  44. @Override
  45. public void run() {
  46. try{
  47. Thread.sleep(3000);
  48. }catch (Exception e){
  49. e.printStackTrace();
  50. }
  51. System.out.println("服务器任务队列2");
  52. }
  53. });
  54. }
  55. //数据读取完毕
  56. @Override
  57. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  58. //writeAndFlush 是 write + flush
  59. //将数据写入到缓存,并刷新
  60. //一般讲,我们对这个发送的数据进行编码
  61. System.out.println("服务端发送的消息");
  62. ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
  63. }
  64. // 专门处理异常的
  65. @Override
  66. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  67. ctx.fireExceptionCaught(cause);
  68. }
  69. }

 通过Channel对象获取到eventLoop循环事件,通过循环事件的执行器,去执行内部的线程。

需要注意,他们的执行顺序,如果当前执行出现延迟,那么会先执行其他的任务,然后执行该任务,需要注意,下面的执行时间,第二个任务队列先让第一个队列任务先执行等待10秒后在执行自己又等待20秒,队列原理先让一个队列元素先执行,任何在自己执行,自然就是30秒

 看下面调试,taskQueue有两个任务

2、 用户自定义定时任务

 该任务是提交到 scheduleTaskQueue中

主要修改了ServerHandler的代码

  1. package yu.learn.simpleNetty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.concurrent.TimeUnit;
  8. /*
  9. 说明
  10. 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
  11. 2. 这时我们自定义一个Handler , 才能称为一个handler
  12. 这是一个出栈
  13. */
  14. public class ServerHandler extends ChannelInboundHandlerAdapter {
  15. //读取数据实际(这里我们可以读取客户端发送的消息)
  16. /*
  17. 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
  18. 2. Object msg: 就是客户端发送的数据 默认Object
  19. */
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. System.out.println("从客户端接收来的消息");
  23. //将 msg 转成一个 ByteBuf
  24. //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
  25. ByteBuf byteBuf = (ByteBuf) msg;
  26. System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
  27. ctx.channel()
  28. .eventLoop()
  29. .execute(
  30. new Runnable() {
  31. @Override
  32. public void run() {
  33. try{
  34. Thread.sleep(1000);
  35. }catch (Exception e){
  36. e.printStackTrace();
  37. }
  38. System.out.println("服务器任务队列1");
  39. }
  40. });
  41. ctx.channel()
  42. .eventLoop()
  43. .execute(
  44. new Runnable() {
  45. @Override
  46. public void run() {
  47. try{
  48. Thread.sleep(3000);
  49. }catch (Exception e){
  50. e.printStackTrace();
  51. }
  52. System.out.println("服务器任务队列2");
  53. }
  54. });
  55. ctx.channel().eventLoop().schedule(new Runnable() {
  56. @Override
  57. public void run() {
  58. try{
  59. Thread.sleep(1000);
  60. }catch (Exception e){
  61. e.printStackTrace();
  62. }
  63. }
  64. },5, TimeUnit.SECONDS);
  65. System.out.println("==============");
  66. }
  67. //数据读取完毕
  68. @Override
  69. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  70. //writeAndFlush 是 write + flush
  71. //将数据写入到缓存,并刷新
  72. //一般讲,我们对这个发送的数据进行编码
  73. System.out.println("服务端发送的消息");
  74. ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
  75. }
  76. // 专门处理异常的
  77. @Override
  78. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  79. ctx.fireExceptionCaught(cause);
  80. }
  81. }

 

 3、非当前 Reactor 线程调用 Channel 的各种方法

友情提示:说的直白点,就是拿别人的Channel做事情(下面的案列就是拿客户端的channle做事情,用来给他们发送消息)。

例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后 调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到 任务队列中后被异步消费

实现思路:可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue

  1. package com.atguigu.netty.simple;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. public class NettyServer {
  8. public static void main(String[] args) throws Exception {
  9. //创建BossGroup 和 WorkerGroup
  10. //说明
  11. //1. 创建两个线程组 bossGroup 和 workerGroup
  12. //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
  13. //3. 两个都是无限循环
  14. //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
  15. // 默认实际 cpu核数 * 2
  16. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  17. EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
  18. try {
  19. //创建服务器端的启动对象,配置参数
  20. ServerBootstrap bootstrap = new ServerBootstrap();
  21. //使用链式编程来进行设置
  22. bootstrap.group(bossGroup, workerGroup) //设置两个线程组
  23. .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
  24. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
  25. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
  26. // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
  27. .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
  28. //给pipeline 设置处理器
  29. @Override
  30. protected void initChannel(SocketChannel ch) throws Exception {
  31. //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
  32. System.out.println("客户socketchannel hashcode=" + ch.hashCode());
  33. ch.pipeline().addLast(new NettyServerHandler());
  34. }
  35. }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
  36. System.out.println(".....服务器 is ready...");
  37. //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
  38. //启动服务器(并绑定端口)
  39. ChannelFuture cf = bootstrap.bind(6668).sync();
  40. //给cf 注册监听器,监控我们关心的事件
  41. cf.addListener(new ChannelFutureListener() {
  42. @Override
  43. public void operationComplete(ChannelFuture future) throws Exception {
  44. if (cf.isSuccess()) {
  45. System.out.println("监听端口 6668 成功");
  46. } else {
  47. System.out.println("监听端口 6668 失败");
  48. }
  49. }
  50. });
  51. //对关闭通道进行监听,这个并不是马上关闭
  52. cf.channel().closeFuture().sync();
  53. }finally {
  54. bossGroup.shutdownGracefully();
  55. workerGroup.shutdownGracefully();
  56. }
  57. }
  58. }

说明

异步模型

Future 说明 

 

 说明:

1、在使用netty进行编程时,拦截操作和转换出入栈数据只需你提供Callback或者利用future即可,这使得链式操作简单,高效,并有利于编写可重用的、通用代码。

2、netty主要的目标就是让我们专心于做业务逻辑。

Future-Listener 机制

 举例说明

小结:

相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好 处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳 定和更高的吞吐量

快速入门实例-HTTP服务

友情提示:通过浏览器请求的时候,发现请求不了用postmain却可以,这是因为浏览器,认为有些端口是不安全。所以设置正常点的端口如8081,这样的端口,还有,可能存在返回给浏览器是乱问问题,修改编码格式UTF-16。

服务端代码:

  1. package yu.learn.http;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. public class NettyServer {
  9. public static void main(String[] args) {
  10. //
  11. EventLoopGroup BossGroup=new NioEventLoopGroup();
  12. EventLoopGroup WorkerGroup=new NioEventLoopGroup();
  13. ServerBootstrap serverBootstrap=new ServerBootstrap();
  14. try{
  15. serverBootstrap.group(BossGroup,WorkerGroup)
  16. .channel(NioServerSocketChannel.class)
  17. .option(ChannelOption.SO_BACKLOG,124)
  18. .childOption(ChannelOption.SO_KEEPALIVE,true)
  19. .childHandler(new ServerInit());
  20. System.out.println("服务端,正在准备中");
  21. ChannelFuture channelFuture= serverBootstrap.bind(6888).sync();
  22. channelFuture.addListener(
  23. future -> {
  24. if (future.isSuccess()) {
  25. System.out.println("连接成功");
  26. } else {
  27. System.out.println("连接失败");
  28. }
  29. });
  30. channelFuture.channel().closeFuture().sync();
  31. }catch (Exception e){
  32. e.printStackTrace();
  33. }finally{
  34. BossGroup.shutdownGracefully();
  35. WorkerGroup.shutdownGracefully();
  36. }
  37. }
  38. }

Pipeline添加事件代码:

  1. package yu.learn.http;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.ChannelPipeline;
  4. import io.netty.channel.socket.SocketChannel;
  5. import io.netty.handler.codec.http.HttpServerCodec;
  6. public class ServerInit extends ChannelInitializer<SocketChannel> {
  7. @Override
  8. protected void initChannel(SocketChannel ch) throws Exception {
  9. ChannelPipeline pipeline= ch.pipeline();
  10. // 编写一个编码器
  11. pipeline.addLast("HttpServerCodec",new HttpServerCodec());
  12. pipeline .addLast("HttpServerHandler",new HttpServerHandler());
  13. }
  14. }

实现自定义Handler

  1. package yu.learn.http;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.handler.codec.http.*;
  7. import io.netty.util.CharsetUtil;
  8. public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
  9. @Override
  10. protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
  11. System.out.println("msg的类型是"+msg.getClass());
  12. System.out.println("获取客户端的地址"+ctx.channel().remoteAddress());
  13. if(msg instanceof HttpRequest){
  14. ByteBuf conent= Unpooled.copiedBuffer("你好我是服务器", CharsetUtil.UTF_8);
  15. FullHttpResponse fullHttpResponse=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,conent);
  16. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
  17. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,conent.readableBytes());
  18. //将构建好 response返回
  19. ctx.writeAndFlush(fullHttpResponse);
  20. }
  21. }
  22. }

存在问题:

运行发现有两次响应,这是因为浏览器的图标也会响应

 修改自定义处理handler代码

过滤掉请求地址是浏览器图标的。本人发现用火狐浏览器,存在一些问题,建议使用谷歌浏览器。

  1. package yu.learn.http;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.handler.codec.http.*;
  7. import io.netty.util.CharsetUtil;
  8. import java.net.URI;
  9. public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
  10. @Override
  11. protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
  12. System.out.println("msg的类型是"+msg.getClass());
  13. System.out.println("获取客户端的地址"+ctx.channel().remoteAddress());
  14. if(msg instanceof HttpRequest){
  15. HttpRequest request=(HttpRequest)msg;
  16. // String url= request.uri();
  17. URI uri = new URI(request.uri());
  18. // System.out.println("获取的uri"+url);
  19. System.out.println("获取的uri1"+uri.getPath());
  20. if("/favicon.ico".equals(uri.getPath())){
  21. System.out.println("过滤掉/favicon.ico");
  22. return;
  23. }
  24. ByteBuf conent= Unpooled.copiedBuffer("你好我是服务器", CharsetUtil.UTF_16);
  25. FullHttpResponse fullHttpResponse=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,conent);
  26. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
  27. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,conent.readableBytes());
  28. //将构建好 response返回
  29. ctx.writeAndFlush(fullHttpResponse);
  30. }
  31. }
  32. }

 看下面的结果,发现浏览器图片没有返回结果

 Netty 网络编程应用实例-群聊系统

 服务端代码:

  1. package com.atguigu.netty.groupchat;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. public class GroupChatServer {
  10. private int port; //监听端口
  11. public GroupChatServer(int port) {
  12. this.port = port;
  13. }
  14. //编写run方法,处理客户端的请求
  15. public void run() throws Exception{
  16. //创建两个线程组
  17. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  18. EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
  19. try {
  20. ServerBootstrap b = new ServerBootstrap();
  21. b.group(bossGroup, workerGroup)
  22. .channel(NioServerSocketChannel.class)
  23. .option(ChannelOption.SO_BACKLOG, 128)
  24. .childOption(ChannelOption.SO_KEEPALIVE, true)
  25. .childHandler(new ChannelInitializer<SocketChannel>() {
  26. @Override
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. //获取到pipeline
  29. ChannelPipeline pipeline = ch.pipeline();
  30. // 下面的编解码器都是Netty自己自带的。
  31. //向pipeline加入解码器
  32. pipeline.addLast("decoder", new StringDecoder());
  33. //向pipeline加入编码器
  34. pipeline.addLast("encoder", new StringEncoder());
  35. //加入自己的业务处理handler
  36. pipeline.addLast(new GroupChatServerHandler());
  37. }
  38. });
  39. System.out.println("netty 服务器启动");
  40. ChannelFuture channelFuture = b.bind(port).sync();
  41. //监听关闭
  42. channelFuture.channel().closeFuture().sync();
  43. }finally {
  44. bossGroup.shutdownGracefully();
  45. workerGroup.shutdownGracefully();
  46. }
  47. }
  48. public static void main(String[] args) throws Exception {
  49. new GroupChatServer(7000).run();
  50. }
  51. }

服务端Handler:

  1. package com.atguigu.netty.groupchat;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.channel.group.ChannelGroup;
  6. import io.netty.channel.group.DefaultChannelGroup;
  7. import io.netty.util.concurrent.GlobalEventExecutor;
  8. import java.text.SimpleDateFormat;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. import java.util.Map;
  13. public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
  14. //public static List<Channel> channels = new ArrayList<Channel>();
  15. //使用一个hashmap 管理
  16. //public static Map<String, Channel> channels = new HashMap<String,Channel>();
  17. //定义一个channle 组,管理所有的channel
  18. //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
  19. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  20. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  21. //handlerAdded 表示连接建立,一旦连接,第一个被执行
  22. //将当前channel 加入到 channelGroup
  23. @Override
  24. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  25. Channel channel = ctx.channel();
  26. //将该客户加入聊天的信息推送给其它在线的客户端
  27. /*
  28. 这个是Netty自带的,writeAndFlush方法内部做了直接遍历给各个客户端发送消息
  29. 该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
  30. 我们不需要自己遍历
  31. */
  32. channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
  33. channelGroup.add(channel);
  34. }
  35. //断开连接, 将xx客户离开信息推送给当前在线的客户
  36. @Override
  37. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  38. Channel channel = ctx.channel();
  39. channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
  40. System.out.println("channelGroup size" + channelGroup.size());
  41. }
  42. //表示channel 处于活动状态, 提示 xx上线
  43. @Override
  44. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  45. System.out.println(ctx.channel().remoteAddress() + " 上线了~");
  46. }
  47. //表示channel 处于不活动状态, 提示 xx离线了
  48. @Override
  49. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  50. System.out.println(ctx.channel().remoteAddress() + " 离线了~");
  51. }
  52. //读取数据
  53. @Override
  54. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  55. //获取到当前channel
  56. Channel channel = ctx.channel();
  57. //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
  58. channelGroup.forEach(ch -> {
  59. if(channel != ch) { //不是当前的channel,转发消息
  60. // channel.remoteAddress()发送给ch
  61. ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
  62. }else {//回显自己发送的消息给自己
  63. ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
  64. }
  65. });
  66. }
  67. @Override
  68. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  69. //关闭通道
  70. ctx.close();
  71. }
  72. }

客户端代码:

  1. package com.atguigu.netty.groupchat;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import java.util.Scanner;
  10. public class GroupChatClient {
  11. //属性
  12. private final String host;
  13. private final int port;
  14. public GroupChatClient(String host, int port) {
  15. this.host = host;
  16. this.port = port;
  17. }
  18. public void run() throws Exception{
  19. EventLoopGroup group = new NioEventLoopGroup();
  20. try {
  21. Bootstrap bootstrap = new Bootstrap()
  22. .group(group)
  23. .channel(NioSocketChannel.class)
  24. .handler(new ChannelInitializer<SocketChannel>() {
  25. @Override
  26. protected void initChannel(SocketChannel ch) throws Exception {
  27. //得到pipeline
  28. ChannelPipeline pipeline = ch.pipeline();
  29. //加入相关handler
  30. pipeline.addLast("decoder", new StringDecoder());
  31. pipeline.addLast("encoder", new StringEncoder());
  32. //加入自定义的handler
  33. pipeline.addLast(new GroupChatClientHandler());
  34. }
  35. });
  36. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
  37. //得到channel
  38. Channel channel = channelFuture.channel();
  39. System.out.println("-------" + channel.localAddress()+ "--------");
  40. //客户端需要输入信息,创建一个扫描器
  41. Scanner scanner = new Scanner(System.in);
  42. while (scanner.hasNextLine()) {
  43. String msg = scanner.nextLine();
  44. //通过channel 发送到服务器端
  45. channel.writeAndFlush(msg + "\r\n");
  46. }
  47. }finally {
  48. group.shutdownGracefully();
  49. }
  50. }
  51. public static void main(String[] args) throws Exception {
  52. new GroupChatClient("127.0.0.1", 7000).run();
  53. }
  54. }

客户端Handler

  1. package com.atguigu.netty.groupchat;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
  5. @Override
  6. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  7. System.out.println(msg.trim());
  8. }
  9. }

私聊实现思路

友情提示:用户信息先注册到数据库,在用户进行私聊,服务端只需要获取对应用户的编号。判断是不是对应用户编号,如果是就进行转发。当然私聊存在维护好友列表,或者是群聊对应的好友列表。总之就是服务端获取到匹配的编号,通过服务端进行间接的转发。建立连接就将其对应的编号存储起来。为后面匹配用户编号发送消息做准备

 

实现方式一:

 

实现方式二:

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/408187
推荐阅读
相关标签
  

闽ICP备14008679号