当前位置:   article > 正文

reactor讲解_reactor语法

reactor语法

Doug Lea 

chrome-extension://ibllepbpahcoppkjjllbabhnigcbffpi/http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

本文目录 (本文仅为记录用)

1. 为什么是Reactor模式
2. Reactor模式简介
3. 多线程IO的致命缺陷
4. 单线程Reactor模型
4.1. 什么是单线程Reactor呢?
4.2. 单线程Reactor的参考代码
4.3. 单线程模式的缺点:
5. 多线程的Reactor
5.1. 基于线程池的改进
5.2. 改进后的完整示意图
5.3. 多线程Reactor的参考代码
6. Reactor持续改进
7. Reactor编程的优点和缺点
7.1. 优点
7.2. 缺点

 

1. 为什么是Reactor模式

写多了代码的兄弟们都知道,JAVA代码由于到处面向接口及高度抽象,用到继承多态和设计模式,程序的组织不是按照正常的理解顺序来的,对代码跟踪很是个问题。所以,在阅读别人的源码时,如果不了解代码的组织方式,往往是晕头转向,不知在何处。尤其是阅读经典代码的时候,更是如此。

反过来,如果先了解代码的设计模式,再来去代码,就会阅读的很轻松,不会那么难懂。

像netty这样的精品中的极品,肯定也是需要先从设计模式入手的。netty的整体架构,基于了一个著名的模式——Reactor模式。Reactor模式,是高性能网络编程的必知必会模式。

首先熟悉Reactor模式,一定是磨刀不误砍柴工。

2. Reactor模式简介

Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,本文站在巨人的肩膀上,借助 Doug Lea(就是那位让人无限景仰的大爷)的“Scalable IO in Java”中讲述的Reactor模式。

“Scalable IO in Java”的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

3. 多线程IO的致命缺陷

最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

复制代码

  1. while(true){
  2. socket = accept();
  3. handle(socket)
  4. }

复制代码

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

复制代码

  1. package com.crazymakercircle.iodemo.base;
  2. import com.crazymakercircle.config.SystemConfig;
  3. import java.io.IOException;
  4. import java.net.ServerSocket;
  5. import java.net.Socket;
  6. class BasicModel implements Runnable {
  7. public void run() {
  8. try {
  9. ServerSocket ss =
  10. new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
  11. while (!Thread.interrupted())
  12. new Thread(new Handler(ss.accept())).start();
  13. //创建新线程来handle
  14. // or, single-threaded, or a thread pool
  15. } catch (IOException ex) { /* ... */ }
  16. }
  17. static class Handler implements Runnable {
  18. final Socket socket;
  19. Handler(Socket s) { socket = s; }
  20. public void run() {
  21. try {
  22. byte[] input = new byte[SystemConfig.INPUT_SIZE];
  23. socket.getInputStream().read(input);
  24. byte[] output = process(input);
  25. socket.getOutputStream().write(output);
  26. } catch (IOException ex) { /* ... */ }
  27. }
  28. private byte[] process(byte[] input) {
  29. byte[] output=null;
  30. /* ... */
  31. return output;
  32. }
  33. }
  34. }

复制代码

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

tomcat服务器的早期版本确实是这样实现的。

多线程并发模式,一个连接一个线程的优点是:

一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

多线程并发模式,一个连接一个线程的缺点是:

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进方法是:

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

4. 单线程Reactor模型

 

Reactor模型的朴素原型

Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。

复制代码

  1. static class Server
  2. {
  3. public static void testServer() throws IOException
  4. {
  5. // 1、获取Selector选择器
  6. Selector selector = Selector.open();
  7. // 2、获取通道
  8. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  9. // 3.设置为非阻塞
  10. serverSocketChannel.configureBlocking(false);
  11. // 4、绑定连接
  12. serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));
  13. // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
  14. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  15. // 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
  16. while (selector.select() > 0)
  17. {
  18. // 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
  19. Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
  20. while (selectedKeys.hasNext())
  21. {
  22. // 8、获取“准备就绪”的时间
  23. SelectionKey selectedKey = selectedKeys.next();
  24. // 9、判断key是具体的什么事件
  25. if (selectedKey.isAcceptable())
  26. {
  27. // 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
  28. SocketChannel socketChannel = serverSocketChannel.accept();
  29. // 11、切换为非阻塞模式
  30. socketChannel.configureBlocking(false);
  31. // 12、将该通道注册到selector选择器上
  32. socketChannel.register(selector, SelectionKey.OP_READ);
  33. }
  34. else if (selectedKey.isReadable())
  35. {
  36. // 13、获取该选择器上的“读就绪”状态的通道
  37. SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
  38. // 14、读取数据
  39. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  40. int length = 0;
  41. while ((length = socketChannel.read(byteBuffer)) != -1)
  42. {
  43. byteBuffer.flip();
  44. System.out.println(new String(byteBuffer.array(), 0, length));
  45. byteBuffer.clear();
  46. }
  47. socketChannel.close();
  48. }
  49. // 15、移除选择键
  50. selectedKeys.remove();
  51. }
  52. }
  53. // 7、关闭连接
  54. serverSocketChannel.close();
  55. }
  56. public static void main(String[] args) throws IOException
  57. {
  58. testServer();
  59. }
  60. }

复制代码

 

实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

 

4.1. 什么是单线程Reactor呢?

 

如下图所示:

wpsC334.tmp

这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。

wpsC345.tmp

顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

 

4.2. 单线程Reactor的参考代码

“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

 

复制代码

  1. package com.crazymakercircle.ReactorModel;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. class Reactor implements Runnable
  12. {
  13. final Selector selector;
  14. final ServerSocketChannel serverSocket;
  15. Reactor(int port) throws IOException
  16. { //Reactor初始化
  17. selector = Selector.open();
  18. serverSocket = ServerSocketChannel.open();
  19. serverSocket.socket().bind(new InetSocketAddress(port));
  20. //非阻塞
  21. serverSocket.configureBlocking(false);
  22. //分步处理,第一步,接收accept事件
  23. SelectionKey sk =
  24. serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  25. //attach callback object, Acceptor
  26. sk.attach(new Acceptor());
  27. }
  28. public void run()
  29. {
  30. try
  31. {
  32. while (!Thread.interrupted())
  33. {
  34. selector.select();
  35. Set selected = selector.selectedKeys();
  36. Iterator it = selected.iterator();
  37. while (it.hasNext())
  38. {
  39. //Reactor负责dispatch收到的事件
  40. dispatch((SelectionKey) (it.next()));
  41. }
  42. selected.clear();
  43. }
  44. } catch (IOException ex)
  45. { /* ... */ }
  46. }
  47. void dispatch(SelectionKey k)
  48. {
  49. Runnable r = (Runnable) (k.attachment());
  50. //调用之前注册的callback对象
  51. if (r != null)
  52. {
  53. r.run();
  54. }
  55. }
  56. // inner class
  57. class Acceptor implements Runnable
  58. {
  59. public void run()
  60. {
  61. try
  62. {
  63. SocketChannel channel = serverSocket.accept();
  64. if (channel != null)
  65. new Handler(selector, channel);
  66. } catch (IOException ex)
  67. { /* ... */ }
  68. }
  69. }
  70. }

复制代码

 

Handler的代码如下:

 

复制代码

  1. package com.crazymakercircle.ReactorModel;
  2. import com.crazymakercircle.config.SystemConfig;
  3. import java.io.IOException;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. class Handler implements Runnable
  9. {
  10. final SocketChannel channel;
  11. final SelectionKey sk;
  12. ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
  13. ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
  14. static final int READING = 0, SENDING = 1;
  15. int state = READING;
  16. Handler(Selector selector, SocketChannel c) throws IOException
  17. {
  18. channel = c;
  19. c.configureBlocking(false);
  20. // Optionally try first read now
  21. sk = channel.register(selector, 0);
  22. //将Handler作为callback对象
  23. sk.attach(this);
  24. //第二步,注册Read就绪事件
  25. sk.interestOps(SelectionKey.OP_READ);
  26. selector.wakeup();
  27. }
  28. boolean inputIsComplete()
  29. {
  30. /* ... */
  31. return false;
  32. }
  33. boolean outputIsComplete()
  34. {
  35. /* ... */
  36. return false;
  37. }
  38. void process()
  39. {
  40. /* ... */
  41. return;
  42. }
  43. public void run()
  44. {
  45. try
  46. {
  47. if (state == READING)
  48. {
  49. read();
  50. }
  51. else if (state == SENDING)
  52. {
  53. send();
  54. }
  55. } catch (IOException ex)
  56. { /* ... */ }
  57. }
  58. void read() throws IOException
  59. {
  60. channel.read(input);
  61. if (inputIsComplete())
  62. {
  63. process();
  64. state = SENDING;
  65. // Normally also do first write now
  66. //第三步,接收write就绪事件
  67. sk.interestOps(SelectionKey.OP_WRITE);
  68. }
  69. }
  70. void send() throws IOException
  71. {
  72. channel.write(output);
  73. //write完就结束了, 关闭select key
  74. if (outputIsComplete())
  75. {
  76. sk.cancel();
  77. }
  78. }
  79. }

复制代码

 

这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。

如果对NIO的Seletor不完全了解,影响到上面的代码阅读,请阅读疯狂创客圈的Java NIO死磕 文章。

 

4.3. 单线程模式的缺点:

1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

 

5. 多线程的Reactor

5.1. 基于线程池的改进

在线程Reactor模式基础上,做如下改进:

(1)将Handler处理器的执行放入线程池,多线程进行业务处理。

(2)而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

一个简单的图如下:

image

5.2. 改进后的完整示意图

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

wpsC376.tmp

 

5.3. 多线程Reactor的参考代码

“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

 

复制代码

  1. package com.crazymakercircle.ReactorModel;
  2. import com.crazymakercircle.config.SystemConfig;
  3. import java.io.IOException;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. class MthreadHandler implements Runnable
  11. {
  12. final SocketChannel channel;
  13. final SelectionKey selectionKey;
  14. ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
  15. ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
  16. static final int READING = 0, SENDING = 1;
  17. int state = READING;
  18. ExecutorService pool = Executors.newFixedThreadPool(2);
  19. static final int PROCESSING = 3;
  20. MthreadHandler(Selector selector, SocketChannel c) throws IOException
  21. {
  22. channel = c;
  23. c.configureBlocking(false);
  24. // Optionally try first read now
  25. selectionKey = channel.register(selector, 0);
  26. //将Handler作为callback对象
  27. selectionKey.attach(this);
  28. //第二步,注册Read就绪事件
  29. selectionKey.interestOps(SelectionKey.OP_READ);
  30. selector.wakeup();
  31. }
  32. boolean inputIsComplete()
  33. {
  34. /* ... */
  35. return false;
  36. }
  37. boolean outputIsComplete()
  38. {
  39. /* ... */
  40. return false;
  41. }
  42. void process()
  43. {
  44. /* ... */
  45. return;
  46. }
  47. public void run()
  48. {
  49. try
  50. {
  51. if (state == READING)
  52. {
  53. read();
  54. }
  55. else if (state == SENDING)
  56. {
  57. send();
  58. }
  59. } catch (IOException ex)
  60. { /* ... */ }
  61. }
  62. synchronized void read() throws IOException
  63. {
  64. // ...
  65. channel.read(input);
  66. if (inputIsComplete())
  67. {
  68. state = PROCESSING;
  69. //使用线程pool异步执行
  70. pool.execute(new Processer());
  71. }
  72. }
  73. void send() throws IOException
  74. {
  75. channel.write(output);
  76. //write完就结束了, 关闭select key
  77. if (outputIsComplete())
  78. {
  79. selectionKey.cancel();
  80. }
  81. }
  82. synchronized void processAndHandOff()
  83. {
  84. process();
  85. state = SENDING;
  86. // or rebind attachment
  87. //process完,开始等待write就绪
  88. selectionKey.interestOps(SelectionKey.OP_WRITE);
  89. }
  90. class Processer implements Runnable
  91. {
  92. public void run()
  93. {
  94. processAndHandOff();
  95. }
  96. }
  97. }

复制代码

 

Reactor 类没有大的变化,参考前面的代码。

6. Reactor持续改进

对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

 

复制代码

  1. package com.crazymakercircle.ReactorModel;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.net.Socket;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. class MthreadReactor implements Runnable
  12. {
  13. //subReactors集合, 一个selector代表一个subReactor
  14. Selector[] selectors=new Selector[2];
  15. int next = 0;
  16. final ServerSocketChannel serverSocket;
  17. MthreadReactor(int port) throws IOException
  18. { //Reactor初始化
  19. selectors[0]=Selector.open();
  20. selectors[1]= Selector.open();
  21. serverSocket = ServerSocketChannel.open();
  22. serverSocket.socket().bind(new InetSocketAddress(port));
  23. //非阻塞
  24. serverSocket.configureBlocking(false);
  25. //分步处理,第一步,接收accept事件
  26. SelectionKey sk =
  27. serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
  28. //attach callback object, Acceptor
  29. sk.attach(new Acceptor());
  30. }
  31. public void run()
  32. {
  33. try
  34. {
  35. while (!Thread.interrupted())
  36. {
  37. for (int i = 0; i <2 ; i++)
  38. {
  39. selectors[i].select();
  40. Set selected = selectors[i].selectedKeys();
  41. Iterator it = selected.iterator();
  42. while (it.hasNext())
  43. {
  44. //Reactor负责dispatch收到的事件
  45. dispatch((SelectionKey) (it.next()));
  46. }
  47. selected.clear();
  48. }
  49. }
  50. } catch (IOException ex)
  51. { /* ... */ }
  52. }
  53. void dispatch(SelectionKey k)
  54. {
  55. Runnable r = (Runnable) (k.attachment());
  56. //调用之前注册的callback对象
  57. if (r != null)
  58. {
  59. r.run();
  60. }
  61. }
  62. class Acceptor { // ...
  63. public synchronized void run() throws IOException
  64. {
  65. SocketChannel connection =
  66. serverSocket.accept(); //主selector负责accept
  67. if (connection != null)
  68. {
  69. new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
  70. }
  71. if (++next == selectors.length) next = 0;
  72. }
  73. }
  74. }

复制代码

 

7. Reactor编程的优点和缺点

6.1. 优点

1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;

2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;

3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

6.2. 缺点

1)相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

2)Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。

3) Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。

 

在开启Netty源码前,上面的经典代码,一定要看懂哦!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/194163
推荐阅读
相关标签
  

闽ICP备14008679号