赞
踩
Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多 线程模型有多个 Reactor
说明
1、Boos Groop和Worker Group内部都包含多个NioEventLoop(又叫循环事件)。
2、每个NIOEventLoop对应一个Selector,
说明:
1、netty抽象出两个线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写。
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup(事件循环组)。
3、NioEventLoopGroup相当于一个事件循环组,这个组中有多个事件循环,每一事件循环是NioEventLoop.
4、NioEventLoop表示的是一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定其上的socket的网络通讯。
5、NioEventLoopGroup可以有多个线程,并且可以设定个数,多少个线程对应的就是一个NioEventLoop
6、每个BossNioEventLoop循环的步骤有3步
1)轮训accept事件
2)处理accept事件,与client建立连接,生成SocketChannel在分装成NioSocketChannel,将其注册到某一个worker的NioEventLoop上的Selector上
3)处理任务队列的任务,及runAllTasks
7、每个worker NioEventLoop循环执行的步骤
1、轮训read,write事件
2、处理i/o事件,既read,write事件,在对应的NIoSockertChannel处理
3、处理任务队列的任务,既runAllTasks
8、每个worker NioEventLoop,处理业务时,会使用Pipeline(管道),pipeline中包含了Channel,即通过pipeline可以获取到对应的通道,管道中维护了很多的处理器。
友情提示:从源码中可以知道从Channel中可以获取Pipeline或者从Pipeline中获取到Channel,Channel主要作用是进行读写,Pipeline主要的作用是进行数据的业务处理,pipeline本质上是一个双向链表,包含一个出栈,入栈的问题
上下文对象包含
友情提示:感觉先介绍核心组件,在学习代码比较容易理解,Netty的代码都是这些组件结合实现代码的编写。
1、EventLoopGroup 和其实现类 NioEventLoopGroup
1.1、EventLoop
2、Bootstrap、ServerBootstrap
3、Channel
4、ChannelOption
5、Selector
6、ChannelHandler 及其实现类
7、ChannelHandlerContext
8、Pipeline 和 ChannelPipeline
9、Future、ChannelFuture、Promise
10、Unpooled
一开始需要树立正确的观念
友情提示:
NioEventLoopGroup可以执行 io事件,普通任务,定时任务
DefaultEventLoopGroup只能执行普通任务,定时任务
友情提示:
EventLoopGroup 内部包含多个EventLoop,每个EventLoop关联一个Selector,多个Channel注册到Selector,BoosGroup的Selector主要监听Channel的连接,WorkerGroup的Selector主要监听Channel的读写操作。当BoosGroup的Selector监听到Channel的连接后,就会将其SockertChannel分装成功NioSocketChannel,WorkerGroup会调用next接口,调用其中的一个EventLoop将其注册到该EventLoop的Selector进行监听。
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
常用方法
代码:
- package cn.itcast.netty.c3;
-
- import io.netty.channel.DefaultEventLoopGroup;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.util.NettyRuntime;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.TimeUnit;
-
- @Slf4j
- public class TestEventLoop {
- public static void main(String[] args) {
- // 1. 创建事件循环组
- EventLoopGroup group = new NioEventLoopGroup(2); // io事件,普通任务,定时任务
- // EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
- // 2. 获取下一个事件循环对象
- System.out.println(group.next());
- System.out.println(group.next());
- System.out.println(group.next());
- System.out.println(group.next());
-
- // 3. 执行普通任务
- // execute也可以换成submit,group.next().submit
-
- /*group.next().execute(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- log.debug("ok");
- });*/
-
- // 4. 执行定时任务,以一定的频率执行
- group.next().scheduleAtFixedRate(() -> {
- log.debug("ok");
- // 第一个参数初始的延迟时间,如果为0就是立刻执行
- // 第二个参数,就是每隔多长时间执行。
- }, 0, 1, TimeUnit.SECONDS);
-
- log.debug("main");
- }
- }
服务器端两个 nio worker 工人
- package cn.itcast.netty.c3;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import lombok.extern.slf4j.Slf4j;
-
- import java.nio.charset.Charset;
-
- @Slf4j
- public class EventLoopServer {
- public static void main(String[] args) {
- // 细分2:创建一个独立的 EventLoopGroup
- EventLoopGroup group = new DefaultEventLoopGroup();
- new ServerBootstrap()
- // boss 和 worker
- // 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写
- .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(NioSocketChannel ch) throws Exception {
- ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
- @Override // ByteBuf
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- log.debug(buf.toString(Charset.defaultCharset()));
- ctx.fireChannelRead(msg); // 让消息传递给下一个handler
- }
- });
- /*.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
- @Override // ByteBuf
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- log.debug(buf.toString(Charset.defaultCharset()));
- }
- });*/
- }
- })
- .bind(8080);
- }
- }
结论:
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
再增加两个非 nio 工人
- package cn.itcast.netty.c3;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import lombok.extern.slf4j.Slf4j;
-
- import java.nio.charset.Charset;
-
- @Slf4j
- public class EventLoopServer {
- public static void main(String[] args) {
- // 细分2:创建一个独立的 EventLoopGroup
- EventLoopGroup group = new DefaultEventLoopGroup(2);
- new ServerBootstrap()
- // boss 和 worker
- // 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写
- .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(NioSocketChannel ch) throws Exception {
- ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
- @Override // ByteBuf
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- log.debug(buf.toString(Charset.defaultCharset()));
- ctx.fireChannelRead(msg); // 让消息传递给下一个handler
- }
- })
- .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
- @Override // ByteBuf
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf) msg;
- log.debug(buf.toString(Charset.defaultCharset()));
- }
- });
- }
- })
- .bind(8080);
- }
- }
结论:可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)
友情提示;针对上面的添加的非Nio工人,是怎么实现handler换人的。
从源码: io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
源码:
- static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
- final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
- EventExecutor executor = next.executor();
- if (executor.inEventLoop()) {
- next.invokeChannelRead(m);
- } else {
- executor.execute(new Runnable() {
- public void run() {
- next.invokeChannelRead(m);
- }
- });
- }
-
- }
可以理解为线程池+Selector,可以关联多个Channel.
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
友情提示:NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务
- package cn.itcast.netty.c3;
-
- import io.netty.channel.DefaultEventLoopGroup;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.util.NettyRuntime;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.TimeUnit;
-
- @Slf4j
- public class TestEventLoop {
- public static void main(String[] args) {
- // 1. 创建事件循环组
- EventLoopGroup group = new NioEventLoopGroup(2); // io事件,普通任务,定时任务
- // EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
- // 2. 获取下一个事件循环对象
- System.out.println(group.next());
- System.out.println(group.next());
- System.out.println(group.next());
- System.out.println(group.next());
-
- // 3. 执行普通任务
- // execute也可以换成submit,group.next().submit
-
- /*group.next().execute(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- log.debug("ok");
- });*/
-
- // 4. 执行定时任务,以一定的频率执行
- group.next().scheduleAtFixedRate(() -> {
- log.debug("ok");
- // 第一个参数初始的延迟时间,如果为0就是立刻执行
- // 第二个参数,就是每隔多长时间执行。
- }, 0, 1, TimeUnit.SECONDS);
-
- log.debug("main");
- }
- }
友情提示:ServerBootstrap用于服务端,Bootstrap用于客户端。这两个主要用来配置Netty的衔接各个组件。也叫启动引导类。
常用方法:group,channel,option,childOption,childHandler,bind,connect,这里有客户端的方法,也有服务端的方法
友情提示:
1、通过Channel,选择不同的Channel可以选择不同的协议
NioSocketChannel,异步的客户端 TCP Socket 连接。
• NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
• NioDatagramChannel,异步的 UDP 连接。
• NioSctpChannel,异步的客户端 Sctp 连接。
• NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO
2、通过Channel可以获取网络连接的配置参数,可以获取连接的通道状态,可以获取pipeline,可以获取RomteAddress,和LocalAddress。
3、Channel提供网络IO操作(事件操作),但是不能保证结果的是否成功。通过ChannelFuture实例,通过监听器,来监测事件操作是否成功
channel 的主要作用
友情提示:记住
1、ChannelOption.SO_BACKLOG:主要用于设置连接队列的大小的,如果处理不了,就可以将其存入到队列中,可以通过他决定队列的大小。
2、ChannelOption.SO_KEEPALIVE:设置保持连接获得状态
这个就不介绍了,前面的NIO已经很详细了
友情提示:又名叫上下文对象,可以获取到Channel和Pipeline,同时Channel和Pipeline之间是相互获取,debug中看到他们之间是不断的套娃。
1、从下面图可以看出,ChannelHandlerContext内部对应关联着一个ChannelHandler,实际干事的是ChannelHandlerContext,Pipeline是一个双向链表,每一个节点就是ChannelHandlerContext,包括头和尾。
1、ChannelHandler是一个接口,有Netty自己实现的Handler,主要的还是程序员自己编写的Handler,当Handler处理完操作后,就会将其转发到Pipeline中的下一个Handler进行处理,有点像SpringSecurity的filterChain一样。
2、一般继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,下面的ChannelInboundHandlerAdapter,表示的是入栈,ChannelOutboundHandlerAdapter表示的是出栈
3、继承Handler之后,就要实现对应的内部的方法,主要有连接操作,读取操作,异常操作等
友情提示:主要保存ChannelHandler的结合,他是一个双向链表。
常用方法
友情提示:Channel在事件操作的时候,不能很好的获取事件的操作的成功与否,可以通过ChannelFuture,利用Future和监听器,实现事件操作的成功与否的反馈。不然不知道有没有成功。
友情提示:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象,这也就导致服务端接收不到数据。解决办法使用同步的方式或者使用回调函数。
Future & Promise
在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名(netty的Future继承jdk中的Future),但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdkFuture代码:
- package cn.itcast.netty.c3;
-
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.*;
-
- @Slf4j
- public class TestJdkFuture {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 1. 线程池
- ExecutorService service = Executors.newFixedThreadPool(2);
- // 2. 提交任务
- Future<Integer> future = service.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- log.debug("执行计算");
- Thread.sleep(1000);
- return 50;
- }
- });
- // 3. 主线程通过 future 来获取结果
- log.debug("等待结果");
- log.debug("结果是 {}", future.get());
- }
- }
NettyFuture代码:
- package cn.itcast.netty.c3;
-
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.EventLoop;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.util.concurrent.Future;
- import io.netty.util.concurrent.GenericFutureListener;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
-
- @Slf4j
- public class TestNettyFuture {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- NioEventLoopGroup group = new NioEventLoopGroup();
- EventLoop eventLoop = group.next();
- Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- log.debug("执行计算");
- Thread.sleep(1000);
- return 70;
- }
- });
- // log.debug("等待结果");
- // log.debug("结果是 {}", future.get());
- future.addListener(new GenericFutureListener<Future<? super Integer>>(){
- @Override
- public void operationComplete(Future<? super Integer> future) throws Exception {
- log.debug("接收结果:{}", future.getNow());
- }
- });
- }
- }
Promise代码:
- package cn.itcast.netty.c3;
-
- import io.netty.channel.EventLoop;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.util.concurrent.DefaultPromise;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.concurrent.ExecutionException;
-
- @Slf4j
- public class TestNettyPromise {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 1. 准备 EventLoop 对象
- EventLoop eventLoop = new NioEventLoopGroup().next();
- // 2. 可以主动创建 promise, 结果容器
- DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
- new Thread(() -> {
- // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
- log.debug("开始计算...");
- try {
- int i = 1 / 0;
- Thread.sleep(1000);
- promise.setSuccess(80);
- } catch (Exception e) {
- e.printStackTrace();
- promise.setFailure(e);
- }
-
- }).start();
- // 4. 接收结果的线程
- log.debug("等待结果...");
- log.debug("结果是: {}", promise.get());
- }
-
- }
友情提示:Netty的数据容器,用于数据读取和写入。
ByteBuf buffer = Unpooled.buffer(10);
常用方法:
1、writeByte():写入数据
2、capacity():获取定义的总长度
3、readByte():读取字节,这个可以让其readerindex属性值变化。
4、getByte(0),获取指定位置的值,这个不能让其readerindex属性值变化。
5、hasArray():判断是否还有元素
6、array():变成字符数组
7、arrayOffset():获取偏移量
8、readerIndex():获取读取的位置
9、writerIndex():获取写的位置
10、getCharSequence(0, 4, Charset.forName("utf-8")):截取指定的长度,有点类似String的截取字符串一样.
-
- //2. 在netty 的buffer中,不需要使用flip 进行反转
- // 底层维护了 readerindex 和 writerIndex
- //3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
- // 0---readerindex 已经读取的区域
- // readerindex---writerIndex , 可读的区域
- // writerIndex -- capacity, 可写的区域
代码1:
- package com.atguigu.netty.buf;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
-
- public class NettyByteBuf01 {
- public static void main(String[] args) {
-
-
- //创建一个ByteBuf
- //说明
- //1. 创建 对象,该对象包含一个数组arr , 是一个byte[10]
- //2. 在netty 的buffer中,不需要使用flip 进行反转
- // 底层维护了 readerindex 和 writerIndex
- //3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
- // 0---readerindex 已经读取的区域
- // readerindex---writerIndex , 可读的区域
- // writerIndex -- capacity, 可写的区域
- ByteBuf buffer = Unpooled.buffer(10);
-
- for(int i = 0; i < 10; i++) {
- buffer.writeByte(i);
- }
-
- System.out.println("capacity=" + buffer.capacity());//10
- //输出
- // for(int i = 0; i<buffer.capacity(); i++) {
- // System.out.println(buffer.getByte(i));
- // }
- for(int i = 0; i < buffer.capacity(); i++) {
- System.out.println(buffer.readByte());
- }
- System.out.println("执行完毕");
- }
- }
代码2:
- package com.atguigu.netty.buf;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
-
- import java.nio.charset.Charset;
-
- public class NettyByteBuf02 {
- public static void main(String[] args) {
-
- //创建ByteBuf
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
-
- //使用相关的方法
- if(byteBuf.hasArray()) { // true
-
- byte[] content = byteBuf.array();
-
- //将 content 转成字符串
- System.out.println(new String(content, Charset.forName("utf-8")));
-
- System.out.println("byteBuf=" + byteBuf);
-
- System.out.println(byteBuf.arrayOffset()); // 0 数组的偏移量
- System.out.println(byteBuf.readerIndex()); // 0
- System.out.println(byteBuf.writerIndex()); // 12
- System.out.println(byteBuf.capacity()); // 36
-
- //System.out.println(byteBuf.readByte()); //读取一个元素,readerIndex移动一个位置,导致,byteBuf.readableBytes()长度减少一个
- System.out.println(byteBuf.getByte(0)); // 104
-
- int len = byteBuf.readableBytes(); //可读的字节数 12
- System.out.println("len=" + len);
-
- //使用for取出各个字节
- for(int i = 0; i < len; i++) {
- System.out.println((char) byteBuf.getByte(i));
- }
-
- //按照某个范围读取
- System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
- System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
-
-
- }
-
-
- }
- }
客户端:
- package cn.itcast.netty.c2;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringEncoder;
-
- import java.net.InetSocketAddress;
-
- public class HelloClient {
- public static void main(String[] args) throws InterruptedException {
- // 1. 启动类
- new Bootstrap()
- // 2. 添加 EventLoop
- .group(new NioEventLoopGroup())
- // 3. 选择客户端 channel 实现
- .channel(NioSocketChannel.class)
- // 4. 添加处理器
- .handler(new ChannelInitializer<NioSocketChannel>() {
- @Override // 在连接建立后被调用
- protected void initChannel(NioSocketChannel ch) throws Exception {
- ch.pipeline().addLast(new StringEncoder());
- }
- })
- // 5. 连接到服务器
- .connect(new InetSocketAddress("localhost", 8080))
- .sync()
- .channel()
- // 6. 向服务器发送数据
- .writeAndFlush("hello, world");
- }
- }
服务端:
- package cn.itcast.netty.c2;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.logging.LoggingHandler;
-
- public class HelloServer {
- public static void main(String[] args) {
- // 1. 启动器,负责组装 netty 组件,启动服务器
- new ServerBootstrap()
- // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
- .group(new NioEventLoopGroup())
- // 3. 选择 服务器的 ServerSocketChannel 实现
- .channel(NioServerSocketChannel.class) // OIO BIO
- // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
- .childHandler(
- // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
- new ChannelInitializer<NioSocketChannel>() {
- @Override
- protected void initChannel(NioSocketChannel ch) throws Exception {
- // 6. 添加具体 handler
- ch.pipeline().addLast(new LoggingHandler());
- ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
- ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
- @Override // 读事件
- public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
- System.out.println(msg); // 打印上一步转换好的字符串
- }
- });
- }
- })
- // 7. 绑定监听端口
- .bind(8080);
- }
- }
服务器端:
1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
后面会详细展开
2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
4 处,ServerSocketChannel 绑定的监听端口
5 处,SocketChannel 的处理器,解码 ByteBuf => String
6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端
1 处,创建 NioEventLoopGroup,同 Server
2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
代码执行流程
实例要求:
1) Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 "hello, 服务器~"
2) 服务器可以回复消息给客户端 "hello, 客户端~"
3) 目的:对Netty 线程模型 有一个初步认识, 便于理解Netty 模型理
服务端:
- package yu.learn.simpleNetty;
-
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- public class ServerNetty {
- public static void main(String[] args) {
- //
- EventLoopGroup BossGroup=new NioEventLoopGroup();
- EventLoopGroup WorkerGroup=new NioEventLoopGroup();
- ServerBootstrap serverBootstrap=new ServerBootstrap();
- try{
-
- serverBootstrap.group(BossGroup,WorkerGroup)
- .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
- .option(ChannelOption.SO_BACKLOG,124)// 设置线程队列得到连接个数
- .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
- // 该 handler对应 bossGroup , childHandler 对应 workerGroup
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ServerHandler());
- }
- });// 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
- System.out.println("服务端连接成功");
- //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
- //启动服务器(并绑定端口)
- ChannelFuture future= serverBootstrap.bind(6666).sync();
- //对关闭通道进行监听,这个并不是马上关闭
- future.channel().closeFuture().sync();
- }catch (Exception e){
-
- e.printStackTrace();
- }finally{
- BossGroup.shutdownGracefully();
- WorkerGroup.shutdownGracefully();
- }
-
-
-
-
- }
- }
服务端自定义处理器:
- package yu.learn.simpleNetty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
-
-
- /*
- 说明
- 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
- 2. 这时我们自定义一个Handler , 才能称为一个handler
- 这是一个出栈
- */
- public class ServerHandler extends ChannelInboundHandlerAdapter {
-
- //读取数据实际(这里我们可以读取客户端发送的消息)
- /*
- 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
- 2. Object msg: 就是客户端发送的数据 默认Object
- */
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("从客户端接收来的消息");
- //将 msg 转成一个 ByteBuf
- //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
-
- ctx.channel()
- .eventLoop()
- .execute(
- new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(1000);
- }catch (Exception e){
- e.printStackTrace();
- }
-
- System.out.println("服务器任务队列");
- }
- });
- }
-
- //数据读取完毕
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- //writeAndFlush 是 write + flush
- //将数据写入到缓存,并刷新
- //一般讲,我们对这个发送的数据进行编码
- System.out.println("服务端发送的消息");
- ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
-
- }
-
- // 专门处理异常的
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.fireExceptionCaught(cause);
- }
- }
客户端:
- package yu.learn.simpleNetty;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- public class ClientNetty {
-
- public static void main(String[] args) {
- //
- EventLoopGroup group=new NioEventLoopGroup();
- try{
- Bootstrap bootstrap=new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ClientHandle());
- }
- });
- System.out.println("客户端连接成功");
- ChannelFuture future= bootstrap.connect("127.0.0.1",6666).sync();
- future.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally{
- group.shutdownGracefully();
- }
-
-
-
-
- }
- }
客户端自定义处理器:
- package yu.learn.simpleNetty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
-
- public class ClientHandle extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("client " + ctx);
- ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- System.out.println("从服务端获取的消息");
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
- }
-
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- ctx.fireExceptionCaught(cause);
- }
- }
友情提示:每个NioEventLoop里都有一个selector与TaskQueue,当我们在进行一些耗时的操作的时候,会产生阻塞,这时候我们就可以用到TaskQueue,这个是内部的操作,要注意和下面的异步模型区分开
1) 用户程序自定义的普通任务 [举例说明]
2) 用户自定义定时任务
3) 非当前 Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后 调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到 任务队列中后被异步消费
任务放在taskQueue队列中
主要修改了ServerHandler的代码
- package yu.learn.simpleNetty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
-
-
- /*
- 说明
- 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
- 2. 这时我们自定义一个Handler , 才能称为一个handler
- 这是一个出栈
- */
- public class ServerHandler extends ChannelInboundHandlerAdapter {
-
- //读取数据实际(这里我们可以读取客户端发送的消息)
- /*
- 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
- 2. Object msg: 就是客户端发送的数据 默认Object
- */
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("从客户端接收来的消息");
- //将 msg 转成一个 ByteBuf
- //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
-
- ctx.channel()
- .eventLoop()
- .execute(
- new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(1000);
- }catch (Exception e){
- e.printStackTrace();
- }
-
- System.out.println("服务器任务队列1");
- }
- });
- ctx.channel()
- .eventLoop()
- .execute(
- new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(3000);
- }catch (Exception e){
- e.printStackTrace();
- }
-
- System.out.println("服务器任务队列2");
- }
- });
- }
-
- //数据读取完毕
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- //writeAndFlush 是 write + flush
- //将数据写入到缓存,并刷新
- //一般讲,我们对这个发送的数据进行编码
- System.out.println("服务端发送的消息");
- ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
-
- }
-
- // 专门处理异常的
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.fireExceptionCaught(cause);
- }
- }
通过Channel对象获取到eventLoop循环事件,通过循环事件的执行器,去执行内部的线程。
需要注意,他们的执行顺序,如果当前执行出现延迟,那么会先执行其他的任务,然后执行该任务,需要注意,下面的执行时间,第二个任务队列先让第一个队列任务先执行等待10秒后在执行自己又等待20秒,队列原理先让一个队列元素先执行,任何在自己执行,自然就是30秒
看下面调试,taskQueue有两个任务
该任务是提交到 scheduleTaskQueue中
主要修改了ServerHandler的代码
- package yu.learn.simpleNetty;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.CharsetUtil;
-
- import java.util.concurrent.TimeUnit;
-
-
- /*
- 说明
- 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
- 2. 这时我们自定义一个Handler , 才能称为一个handler
- 这是一个出栈
- */
- public class ServerHandler extends ChannelInboundHandlerAdapter {
-
- //读取数据实际(这里我们可以读取客户端发送的消息)
- /*
- 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
- 2. Object msg: 就是客户端发送的数据 默认Object
- */
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("从客户端接收来的消息");
- //将 msg 转成一个 ByteBuf
- //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("接收的消息是:"+byteBuf.toString(CharsetUtil.UTF_8));
-
- ctx.channel()
- .eventLoop()
- .execute(
- new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(1000);
- }catch (Exception e){
- e.printStackTrace();
- }
-
- System.out.println("服务器任务队列1");
- }
- });
- ctx.channel()
- .eventLoop()
- .execute(
- new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(3000);
- }catch (Exception e){
- e.printStackTrace();
- }
-
- System.out.println("服务器任务队列2");
- }
- });
-
-
- ctx.channel().eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- try{
- Thread.sleep(1000);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- },5, TimeUnit.SECONDS);
- System.out.println("==============");
- }
-
- //数据读取完毕
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- //writeAndFlush 是 write + flush
- //将数据写入到缓存,并刷新
- //一般讲,我们对这个发送的数据进行编码
- System.out.println("服务端发送的消息");
- ctx.writeAndFlush(Unpooled.copiedBuffer("你好客户端,我是服务端",CharsetUtil.UTF_8));
-
- }
-
- // 专门处理异常的
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.fireExceptionCaught(cause);
- }
- }
友情提示:说的直白点,就是拿别人的Channel做事情(下面的案列就是拿客户端的channle做事情,用来给他们发送消息)。
例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后 调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到 任务队列中后被异步消费
实现思路:可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
- package com.atguigu.netty.simple;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- public class NettyServer {
- public static void main(String[] args) throws Exception {
-
-
- //创建BossGroup 和 WorkerGroup
- //说明
- //1. 创建两个线程组 bossGroup 和 workerGroup
- //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
- //3. 两个都是无限循环
- //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
- // 默认实际 cpu核数 * 2
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
-
-
-
- try {
- //创建服务器端的启动对象,配置参数
- ServerBootstrap bootstrap = new ServerBootstrap();
-
- //使用链式编程来进行设置
- bootstrap.group(bossGroup, workerGroup) //设置两个线程组
- .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
- .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
- .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
- // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
- .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
- //给pipeline 设置处理器
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
-
- //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
- System.out.println("客户socketchannel hashcode=" + ch.hashCode());
- ch.pipeline().addLast(new NettyServerHandler());
- }
- }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
-
- System.out.println(".....服务器 is ready...");
-
- //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
- //启动服务器(并绑定端口)
- ChannelFuture cf = bootstrap.bind(6668).sync();
-
- //给cf 注册监听器,监控我们关心的事件
-
- cf.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (cf.isSuccess()) {
- System.out.println("监听端口 6668 成功");
- } else {
- System.out.println("监听端口 6668 失败");
- }
- }
- });
-
-
- //对关闭通道进行监听,这个并不是马上关闭
- cf.channel().closeFuture().sync();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
-
- }
-
- }
说明
说明:
1、在使用netty进行编程时,拦截操作和转换出入栈数据只需你提供Callback或者利用future即可,这使得链式操作简单,高效,并有利于编写可重用的、通用代码。
2、netty主要的目标就是让我们专心于做业务逻辑。
相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好 处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳 定和更高的吞吐量
友情提示:通过浏览器请求的时候,发现请求不了用postmain却可以,这是因为浏览器,认为有些端口是不安全。所以设置正常点的端口如8081,这样的端口,还有,可能存在返回给浏览器是乱问问题,修改编码格式UTF-16。
服务端代码:
- package yu.learn.http;
-
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- public class NettyServer {
- public static void main(String[] args) {
- //
- EventLoopGroup BossGroup=new NioEventLoopGroup();
- EventLoopGroup WorkerGroup=new NioEventLoopGroup();
- ServerBootstrap serverBootstrap=new ServerBootstrap();
- try{
-
- serverBootstrap.group(BossGroup,WorkerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG,124)
- .childOption(ChannelOption.SO_KEEPALIVE,true)
- .childHandler(new ServerInit());
-
- System.out.println("服务端,正在准备中");
- ChannelFuture channelFuture= serverBootstrap.bind(6888).sync();
-
- channelFuture.addListener(
- future -> {
- if (future.isSuccess()) {
- System.out.println("连接成功");
- } else {
- System.out.println("连接失败");
- }
- });
-
- channelFuture.channel().closeFuture().sync();
-
- }catch (Exception e){
- e.printStackTrace();
- }finally{
- BossGroup.shutdownGracefully();
- WorkerGroup.shutdownGracefully();
- }
-
-
- }
- }
Pipeline添加事件代码:
- package yu.learn.http;
-
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.http.HttpServerCodec;
-
- public class ServerInit extends ChannelInitializer<SocketChannel> {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline= ch.pipeline();
- // 编写一个编码器
- pipeline.addLast("HttpServerCodec",new HttpServerCodec());
- pipeline .addLast("HttpServerHandler",new HttpServerHandler());
- }
- }
实现自定义Handler
- package yu.learn.http;
-
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.*;
- import io.netty.util.CharsetUtil;
-
- public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
-
- System.out.println("msg的类型是"+msg.getClass());
- System.out.println("获取客户端的地址"+ctx.channel().remoteAddress());
- if(msg instanceof HttpRequest){
-
- ByteBuf conent= Unpooled.copiedBuffer("你好我是服务器", CharsetUtil.UTF_8);
-
- FullHttpResponse fullHttpResponse=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,conent);
- fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
- fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,conent.readableBytes());
- //将构建好 response返回
- ctx.writeAndFlush(fullHttpResponse);
- }
-
-
- }
- }
运行发现有两次响应,这是因为浏览器的图标也会响应
修改自定义处理handler代码
过滤掉请求地址是浏览器图标的。本人发现用火狐浏览器,存在一些问题,建议使用谷歌浏览器。
- package yu.learn.http;
-
-
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.*;
- import io.netty.util.CharsetUtil;
-
- import java.net.URI;
-
- public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
-
- System.out.println("msg的类型是"+msg.getClass());
- System.out.println("获取客户端的地址"+ctx.channel().remoteAddress());
- if(msg instanceof HttpRequest){
-
- HttpRequest request=(HttpRequest)msg;
- // String url= request.uri();
- URI uri = new URI(request.uri());
- // System.out.println("获取的uri"+url);
- System.out.println("获取的uri1"+uri.getPath());
- if("/favicon.ico".equals(uri.getPath())){
- System.out.println("过滤掉/favicon.ico");
- return;
- }
-
- ByteBuf conent= Unpooled.copiedBuffer("你好我是服务器", CharsetUtil.UTF_16);
-
- FullHttpResponse fullHttpResponse=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,conent);
- fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
- fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,conent.readableBytes());
- //将构建好 response返回
- ctx.writeAndFlush(fullHttpResponse);
- }
-
-
- }
- }
看下面的结果,发现浏览器图片没有返回结果
服务端代码:
- package com.atguigu.netty.groupchat;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
-
- public class GroupChatServer {
-
- private int port; //监听端口
-
-
- public GroupChatServer(int port) {
- this.port = port;
- }
-
- //编写run方法,处理客户端的请求
- public void run() throws Exception{
-
- //创建两个线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
-
- try {
- ServerBootstrap b = new ServerBootstrap();
-
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
-
- //获取到pipeline
- ChannelPipeline pipeline = ch.pipeline();
- // 下面的编解码器都是Netty自己自带的。
- //向pipeline加入解码器
- pipeline.addLast("decoder", new StringDecoder());
- //向pipeline加入编码器
- pipeline.addLast("encoder", new StringEncoder());
- //加入自己的业务处理handler
- pipeline.addLast(new GroupChatServerHandler());
-
- }
- });
-
- System.out.println("netty 服务器启动");
- ChannelFuture channelFuture = b.bind(port).sync();
-
- //监听关闭
- channelFuture.channel().closeFuture().sync();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
-
- }
-
- public static void main(String[] args) throws Exception {
-
- new GroupChatServer(7000).run();
- }
- }
服务端Handler:
- package com.atguigu.netty.groupchat;
-
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.channel.group.DefaultChannelGroup;
- import io.netty.util.concurrent.GlobalEventExecutor;
-
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
-
- //public static List<Channel> channels = new ArrayList<Channel>();
-
- //使用一个hashmap 管理
- //public static Map<String, Channel> channels = new HashMap<String,Channel>();
-
- //定义一个channle 组,管理所有的channel
- //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
- private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
-
- //handlerAdded 表示连接建立,一旦连接,第一个被执行
- //将当前channel 加入到 channelGroup
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- //将该客户加入聊天的信息推送给其它在线的客户端
- /*
- 这个是Netty自带的,writeAndFlush方法内部做了直接遍历给各个客户端发送消息
- 该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
- 我们不需要自己遍历
- */
- channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
- channelGroup.add(channel);
-
-
-
-
- }
-
- //断开连接, 将xx客户离开信息推送给当前在线的客户
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-
- Channel channel = ctx.channel();
- channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
- System.out.println("channelGroup size" + channelGroup.size());
-
- }
-
- //表示channel 处于活动状态, 提示 xx上线
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
- System.out.println(ctx.channel().remoteAddress() + " 上线了~");
- }
-
- //表示channel 处于不活动状态, 提示 xx离线了
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-
- System.out.println(ctx.channel().remoteAddress() + " 离线了~");
- }
-
- //读取数据
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
-
- //获取到当前channel
- Channel channel = ctx.channel();
- //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
-
- channelGroup.forEach(ch -> {
- if(channel != ch) { //不是当前的channel,转发消息
-
- // channel.remoteAddress()发送给ch
- ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
- }else {//回显自己发送的消息给自己
- ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
- }
- });
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- //关闭通道
- ctx.close();
- }
- }
客户端代码:
- package com.atguigu.netty.groupchat;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
-
- import java.util.Scanner;
-
-
- public class GroupChatClient {
-
- //属性
- private final String host;
- private final int port;
-
- public GroupChatClient(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public void run() throws Exception{
- EventLoopGroup group = new NioEventLoopGroup();
-
- try {
-
-
- Bootstrap bootstrap = new Bootstrap()
- .group(group)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
-
- //得到pipeline
- ChannelPipeline pipeline = ch.pipeline();
- //加入相关handler
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- //加入自定义的handler
- pipeline.addLast(new GroupChatClientHandler());
- }
- });
-
- ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
- //得到channel
- Channel channel = channelFuture.channel();
- System.out.println("-------" + channel.localAddress()+ "--------");
- //客户端需要输入信息,创建一个扫描器
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNextLine()) {
- String msg = scanner.nextLine();
- //通过channel 发送到服务器端
- channel.writeAndFlush(msg + "\r\n");
- }
- }finally {
- group.shutdownGracefully();
- }
- }
-
- public static void main(String[] args) throws Exception {
- new GroupChatClient("127.0.0.1", 7000).run();
- }
- }
客户端Handler
- package com.atguigu.netty.groupchat;
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
- public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- System.out.println(msg.trim());
- }
- }
友情提示:用户信息先注册到数据库,在用户进行私聊,服务端只需要获取对应用户的编号。判断是不是对应用户编号,如果是就进行转发。当然私聊存在维护好友列表,或者是群聊对应的好友列表。总之就是服务端获取到匹配的编号,通过服务端进行间接的转发。建立连接就将其对应的编号存储起来。为后面匹配用户编号发送消息做准备
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。