当前位置:   article > 正文

Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

需求:服务端把接收到的数据,原样返回给客户端

服务端代码如下:

直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

服务代码如下:

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.ServerSocketChannel;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Iterator;
  9. import java.util.Set;
  10. public class SocketMultiplexingSingleThreadv2_2 {
  11. private ServerSocketChannel server = null;
  12. private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
  13. int port = 9090;
  14. public void initServer() {
  15. try {
  16. server = ServerSocketChannel.open();
  17. server.configureBlocking(false);
  18. server.bind(new InetSocketAddress(port));
  19. selector = Selector.open(); // select poll *epoll
  20. server.register(selector, SelectionKey.OP_ACCEPT);
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void start() {
  26. initServer();
  27. System.out.println("服务器启动了。。。。。");
  28. try {
  29. while (true) {
  30. // Set<SelectionKey> keys = selector.keys();
  31. // System.out.println(keys.size()+" size");
  32. while (selector.select(50) > 0) {
  33. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  34. Iterator<SelectionKey> iter = selectionKeys.iterator();
  35. while (iter.hasNext()) {
  36. SelectionKey key = iter.next();
  37. iter.remove();
  38. if (key.isAcceptable()) {
  39. acceptHandler(key);
  40. } else if (key.isReadable()) {
  41. // key.cancel(); //现在多路复用器里把key cancel了
  42. System.out.println("in.....");
  43. readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
  44. } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
  45. //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
  46. //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
  47. //1,你准备好要写什么了,这是第一步
  48. //2,第二步你才关心send-queue是否有空间
  49. //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
  50. //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
  51. // key.cancel();
  52. key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
  53. writeHandler(key);
  54. }
  55. }
  56. }
  57. }
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. private void writeHandler(SelectionKey key) {
  63. new Thread(()->{
  64. System.out.println("write handler...");
  65. SocketChannel client = (SocketChannel) key.channel();
  66. ByteBuffer buffer = (ByteBuffer) key.attachment();
  67. buffer.flip();
  68. while (buffer.hasRemaining()) {
  69. try {
  70. int write = client.write(buffer);
  71. System.out.println("write " + Thread.currentThread().getName()+ " " + write);
  72. } catch (IOException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. try {
  77. Thread.sleep(2000);
  78. } catch (InterruptedException e) {
  79. e.printStackTrace();
  80. }
  81. buffer.clear();
  82. // key.cancel();
  83. // try {
  84. client.shutdownOutput();
  85. //
  86. client.close();
  87. //
  88. // } catch (IOException e) {
  89. // e.printStackTrace();
  90. // }
  91. }).start();
  92. }
  93. public void acceptHandler(SelectionKey key) {
  94. try {
  95. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  96. SocketChannel client = ssc.accept();
  97. client.configureBlocking(false);
  98. ByteBuffer buffer = ByteBuffer.allocate(8192);
  99. client.register(selector, SelectionKey.OP_READ, buffer);
  100. System.out.println("-------------------------------------------");
  101. System.out.println("新客户端:" + client.getRemoteAddress());
  102. System.out.println("-------------------------------------------");
  103. } catch (IOException e) {
  104. e.printStackTrace();
  105. }
  106. }
  107. public void readHandler(SelectionKey key) {
  108. new Thread(()->{
  109. System.out.println("read handler.....");
  110. SocketChannel client = (SocketChannel) key.channel();
  111. ByteBuffer buffer = (ByteBuffer) key.attachment();
  112. buffer.clear();
  113. int read = 0;
  114. try {
  115. while (true) {
  116. read = client.read(buffer);
  117. System.out.println("read " + Thread.currentThread().getName()+ " " + read);
  118. if (read > 0) {
  119. client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
  120. } else if (read == 0) {
  121. break;
  122. } else {
  123. client.close();
  124. break;
  125. }
  126. }
  127. } catch (IOException e) {
  128. try {
  129. System.out.println("client " + client.getRemoteAddress() + " disconnected");
  130. client.close();
  131. } catch (IOException ex) {
  132. throw new RuntimeException(ex);
  133. }
  134. e.printStackTrace();
  135. }
  136. }).start();
  137. }
  138. public static void main(String[] args) {
  139. SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();
  140. service.start();
  141. }
  142. }

测试:

先启动一个服务端,再启动一个客户端,客户端发送数据

服务端日志:

  1. 服务器启动了。。。。。
  2. -------------------------------------------
  3. 新客户端:/127.0.0.1:21598
  4. -------------------------------------------
  5. in.....
  6. in.....
  7. read handler.....
  8. in.....
  9. read handler.....
  10. read Thread-0 5
  11. read Thread-1 0
  12. read handler.....
  13. read Thread-2 0
  14. read Thread-0 0
  15. write handler...

客户端日志:

  1. client connected to server
  2. 1234
  3. client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

可以看到,客户端发送数据,没有接收到服务端返回的数据;

服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

tip:read事件来的时候,如果不读取数据,read事件会一直有的

解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

新的服务端代码:

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.util.Iterator;
  6. import java.util.Set;
  7. public class SocketMultiplexingSingleThreadv2 {
  8. private ServerSocketChannel server = null;
  9. private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
  10. int port = 9090;
  11. public void initServer() {
  12. try {
  13. server = ServerSocketChannel.open();
  14. server.configureBlocking(false);
  15. server.bind(new InetSocketAddress(port));
  16. selector = Selector.open(); // select poll *epoll
  17. server.register(selector, SelectionKey.OP_ACCEPT);
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. public void start() {
  23. initServer();
  24. System.out.println("服务器启动了。。。。。");
  25. try {
  26. while (true) {
  27. // Set<SelectionKey> keys = selector.keys();
  28. // System.out.println(keys.size()+" size");
  29. while (selector.select(50) > 0) {
  30. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  31. Iterator<SelectionKey> iter = selectionKeys.iterator();
  32. while (iter.hasNext()) {
  33. SelectionKey key = iter.next();
  34. iter.remove();
  35. if (key.isAcceptable()) {
  36. acceptHandler(key);
  37. } else if (key.isReadable()) {
  38. // key.cancel(); //现在多路复用器里把key cancel了
  39. System.out.println("in.....");
  40. // 同一个Client,读之前先取消OP_READ,防止多线程冲突吹
  41. key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
  42. readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
  43. } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
  44. //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
  45. //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
  46. //1,你准备好要写什么了,这是第一步
  47. //2,第二步你才关心send-queue是否有空间
  48. //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
  49. //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
  50. // key.cancel();
  51. key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
  52. writeHandler(key);
  53. }
  54. }
  55. }
  56. }
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. private void writeHandler(SelectionKey key) {
  62. new Thread(()->{
  63. System.out.println("write handler...");
  64. SocketChannel client = (SocketChannel) key.channel();
  65. ByteBuffer buffer = (ByteBuffer) key.attachment();
  66. buffer.flip();
  67. while (buffer.hasRemaining()) {
  68. try {
  69. int write = client.write(buffer);
  70. System.out.println("write " + Thread.currentThread().getName()+ " " + write);
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. try {
  76. Thread.sleep(2000);
  77. } catch (InterruptedException e) {
  78. e.printStackTrace();
  79. }
  80. buffer.clear();
  81. // key.cancel();
  82. // try {
  83. client.shutdownOutput();
  84. //
  85. client.close();
  86. //
  87. // } catch (IOException e) {
  88. // e.printStackTrace();
  89. // }
  90. }).start();
  91. }
  92. public void acceptHandler(SelectionKey key) {
  93. try {
  94. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  95. SocketChannel client = ssc.accept();
  96. client.configureBlocking(false);
  97. ByteBuffer buffer = ByteBuffer.allocate(8192);
  98. client.register(selector, SelectionKey.OP_READ, buffer);
  99. System.out.println("-------------------------------------------");
  100. System.out.println("新客户端:" + client.getRemoteAddress());
  101. System.out.println("-------------------------------------------");
  102. } catch (IOException e) {
  103. e.printStackTrace();
  104. }
  105. }
  106. public void readHandler(SelectionKey key) {
  107. new Thread(()->{
  108. System.out.println("read handler.....");
  109. SocketChannel client = (SocketChannel) key.channel();
  110. ByteBuffer buffer = (ByteBuffer) key.attachment();
  111. buffer.clear();
  112. int read = 0;
  113. try {
  114. while (true) {
  115. read = client.read(buffer);
  116. System.out.println(Thread.currentThread().getName()+ " " + read);
  117. if (read > 0) {
  118. // 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据
  119. key.interestOps( SelectionKey.OP_READ);
  120. client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
  121. } else if (read == 0) {
  122. break;
  123. } else {
  124. client.close();
  125. break;
  126. }
  127. }
  128. } catch (IOException e) {
  129. try {
  130. System.out.println("client " + client.getRemoteAddress() + " disconnected");
  131. client.close();
  132. } catch (IOException ex) {
  133. throw new RuntimeException(ex);
  134. }
  135. e.printStackTrace();
  136. }
  137. }).start();
  138. }
  139. public static void main(String[] args) {
  140. SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();
  141. service.start();
  142. }
  143. }

测试:

先启动一个服务端,再启动一个客户端1,客户端1发送数据

服务端日志:

  1. 服务器启动了。。。。。
  2. -------------------------------------------
  3. 新客户端:/127.0.0.1:24029
  4. -------------------------------------------
  5. in.....
  6. read handler.....
  7. Thread-0 8
  8. Thread-0 0
  9. write handler...
  10. write Thread-1 8

客户端1日志:

  1. client connected to server
  2. client1
  3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
  4. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

可以看到,客户单和服务端都可以正常接收和发送数据。

再添加一个客户端2,发送数据

服务端日志:

  1. 服务器启动了。。。。。
  2. -------------------------------------------
  3. 新客户端:/127.0.0.1:24029
  4. -------------------------------------------
  5. in.....
  6. read handler.....
  7. Thread-0 8
  8. Thread-0 0
  9. write handler...
  10. write Thread-1 8
  11. -------------------------------------------
  12. 新客户端:/127.0.0.1:24105
  13. -------------------------------------------
  14. in.....
  15. read handler.....
  16. Thread-2 8
  17. Thread-2 0
  18. write handler...
  19. write Thread-3 8

客户端2的日志:

  1. client connected to server
  2. client2
  3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client2
  4. client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

可以看到,客户端2和服务端都可以正常接收和发送数据。

客户端1,再次发送数据

客户端日志:

  1. client connected to server
  2. client1
  3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
  4. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1
  5. clent1_2
  6. client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2
  7. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

服务端日志:

  1. 服务器启动了。。。。。
  2. -------------------------------------------
  3. 新客户端:/127.0.0.1:24029
  4. -------------------------------------------
  5. in.....
  6. read handler.....
  7. Thread-0 8
  8. Thread-0 0
  9. write handler...
  10. write Thread-1 8
  11. -------------------------------------------
  12. 新客户端:/127.0.0.1:24105
  13. -------------------------------------------
  14. in.....
  15. read handler.....
  16. Thread-2 8
  17. Thread-2 0
  18. write handler...
  19. write Thread-3 8
  20. in.....
  21. read handler.....
  22. Thread-4 9
  23. Thread-4 0
  24. write handler...
  25. write Thread-5 9

从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

整个处理流程是服务预期的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/490393
推荐阅读
相关标签
  

闽ICP备14008679号