赞
踩
在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。
前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。
需求:服务端把接收到的数据,原样返回给客户端
服务端代码如下:
直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程
服务代码如下:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
-
- public class SocketMultiplexingSingleThreadv2_2 {
-
- private ServerSocketChannel server = null;
- private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
- int port = 9090;
-
- public void initServer() {
- try {
- server = ServerSocketChannel.open();
- server.configureBlocking(false);
- server.bind(new InetSocketAddress(port));
- selector = Selector.open(); // select poll *epoll
- server.register(selector, SelectionKey.OP_ACCEPT);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void start() {
- initServer();
- System.out.println("服务器启动了。。。。。");
- try {
- while (true) {
- // Set<SelectionKey> keys = selector.keys();
- // System.out.println(keys.size()+" size");
- while (selector.select(50) > 0) {
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iter = selectionKeys.iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
- if (key.isAcceptable()) {
- acceptHandler(key);
- } else if (key.isReadable()) {
- // key.cancel(); //现在多路复用器里把key cancel了
- System.out.println("in.....");
- readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
-
- } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
- //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
- //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
- //1,你准备好要写什么了,这是第一步
- //2,第二步你才关心send-queue是否有空间
- //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
- //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
- // key.cancel();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-
-
- writeHandler(key);
- }
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void writeHandler(SelectionKey key) {
- new Thread(()->{
- System.out.println("write handler...");
- SocketChannel client = (SocketChannel) key.channel();
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- buffer.flip();
- while (buffer.hasRemaining()) {
- try {
- int write = client.write(buffer);
- System.out.println("write " + Thread.currentThread().getName()+ " " + write);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- buffer.clear();
- // key.cancel();
-
- // try {
- client.shutdownOutput();
- //
- client.close();
- //
- // } catch (IOException e) {
- // e.printStackTrace();
- // }
- }).start();
-
- }
-
- public void acceptHandler(SelectionKey key) {
- try {
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- SocketChannel client = ssc.accept();
- client.configureBlocking(false);
- ByteBuffer buffer = ByteBuffer.allocate(8192);
- client.register(selector, SelectionKey.OP_READ, buffer);
- System.out.println("-------------------------------------------");
- System.out.println("新客户端:" + client.getRemoteAddress());
- System.out.println("-------------------------------------------");
-
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void readHandler(SelectionKey key) {
- new Thread(()->{
- System.out.println("read handler.....");
- SocketChannel client = (SocketChannel) key.channel();
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- buffer.clear();
- int read = 0;
- try {
- while (true) {
- read = client.read(buffer);
- System.out.println("read " + Thread.currentThread().getName()+ " " + read);
- if (read > 0) {
- client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
- } else if (read == 0) {
-
- break;
- } else {
- client.close();
- break;
- }
- }
- } catch (IOException e) {
- try {
- System.out.println("client " + client.getRemoteAddress() + " disconnected");
- client.close();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- e.printStackTrace();
- }
- }).start();
-
- }
-
- public static void main(String[] args) {
- SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();
- service.start();
- }
- }
测试:
先启动一个服务端,再启动一个客户端,客户端发送数据
服务端日志:
- 服务器启动了。。。。。
- -------------------------------------------
- 新客户端:/127.0.0.1:21598
- -------------------------------------------
- in.....
- in.....
- read handler.....
- in.....
- read handler.....
- read Thread-0 5
- read Thread-1 0
- read handler.....
- read Thread-2 0
- read Thread-0 0
- write handler...
客户端日志:
- client connected to server
- 1234
- client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234
-
可以看到,客户端发送数据,没有接收到服务端返回的数据;
服务端接收到数据后,在写数据的时候,buffer中没有数据可写;
再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;
一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);
tip:read事件来的时候,如果不读取数据,read事件会一直有的
解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ
新的服务端代码:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.*;
- import java.util.Iterator;
- import java.util.Set;
-
- public class SocketMultiplexingSingleThreadv2 {
-
- private ServerSocketChannel server = null;
- private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
- int port = 9090;
-
- public void initServer() {
- try {
- server = ServerSocketChannel.open();
- server.configureBlocking(false);
- server.bind(new InetSocketAddress(port));
- selector = Selector.open(); // select poll *epoll
- server.register(selector, SelectionKey.OP_ACCEPT);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void start() {
- initServer();
- System.out.println("服务器启动了。。。。。");
- try {
- while (true) {
- // Set<SelectionKey> keys = selector.keys();
- // System.out.println(keys.size()+" size");
- while (selector.select(50) > 0) {
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iter = selectionKeys.iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- iter.remove();
- if (key.isAcceptable()) {
- acceptHandler(key);
- } else if (key.isReadable()) {
- // key.cancel(); //现在多路复用器里把key cancel了
- System.out.println("in.....");
- // 同一个Client,读之前先取消OP_READ,防止多线程冲突吹
- key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
-
- readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
-
- } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
- //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
- //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
- //1,你准备好要写什么了,这是第一步
- //2,第二步你才关心send-queue是否有空间
- //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
- //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
- // key.cancel();
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-
-
- writeHandler(key);
- }
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void writeHandler(SelectionKey key) {
- new Thread(()->{
- System.out.println("write handler...");
- SocketChannel client = (SocketChannel) key.channel();
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- buffer.flip();
- while (buffer.hasRemaining()) {
- try {
- int write = client.write(buffer);
- System.out.println("write " + Thread.currentThread().getName()+ " " + write);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- buffer.clear();
- // key.cancel();
-
- // try {
- client.shutdownOutput();
- //
- client.close();
- //
- // } catch (IOException e) {
- // e.printStackTrace();
- // }
- }).start();
-
- }
-
- public void acceptHandler(SelectionKey key) {
- try {
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- SocketChannel client = ssc.accept();
- client.configureBlocking(false);
- ByteBuffer buffer = ByteBuffer.allocate(8192);
- client.register(selector, SelectionKey.OP_READ, buffer);
- System.out.println("-------------------------------------------");
- System.out.println("新客户端:" + client.getRemoteAddress());
- System.out.println("-------------------------------------------");
-
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void readHandler(SelectionKey key) {
- new Thread(()->{
- System.out.println("read handler.....");
- SocketChannel client = (SocketChannel) key.channel();
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- buffer.clear();
- int read = 0;
- try {
- while (true) {
- read = client.read(buffer);
- System.out.println(Thread.currentThread().getName()+ " " + read);
- if (read > 0) {
- // 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据
- key.interestOps( SelectionKey.OP_READ);
-
- client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
- } else if (read == 0) {
-
- break;
- } else {
- client.close();
- break;
- }
- }
- } catch (IOException e) {
- try {
- System.out.println("client " + client.getRemoteAddress() + " disconnected");
- client.close();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- e.printStackTrace();
- }
- }).start();
-
- }
-
- public static void main(String[] args) {
- SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();
- service.start();
- }
- }
测试:
先启动一个服务端,再启动一个客户端1,客户端1发送数据
服务端日志:
- 服务器启动了。。。。。
- -------------------------------------------
- 新客户端:/127.0.0.1:24029
- -------------------------------------------
- in.....
- read handler.....
- Thread-0 8
- Thread-0 0
- write handler...
- write Thread-1 8
客户端1日志:
- client connected to server
- client1
- client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
-
- client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1
可以看到,客户单和服务端都可以正常接收和发送数据。
再添加一个客户端2,发送数据
服务端日志:
- 服务器启动了。。。。。
- -------------------------------------------
- 新客户端:/127.0.0.1:24029
- -------------------------------------------
- in.....
- read handler.....
- Thread-0 8
- Thread-0 0
- write handler...
- write Thread-1 8
- -------------------------------------------
- 新客户端:/127.0.0.1:24105
- -------------------------------------------
- in.....
- read handler.....
- Thread-2 8
- Thread-2 0
- write handler...
- write Thread-3 8
客户端2的日志:
- client connected to server
- client2
- client receive data from consolejava.io.BufferedInputStream@65231a33 : client2
-
- client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2
可以看到,客户端2和服务端都可以正常接收和发送数据。
客户端1,再次发送数据
客户端日志:
- client connected to server
- client1
- client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
-
- client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1
-
- clent1_2
- client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2
-
- client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2
服务端日志:
- 服务器启动了。。。。。
- -------------------------------------------
- 新客户端:/127.0.0.1:24029
- -------------------------------------------
- in.....
- read handler.....
- Thread-0 8
- Thread-0 0
- write handler...
- write Thread-1 8
- -------------------------------------------
- 新客户端:/127.0.0.1:24105
- -------------------------------------------
- in.....
- read handler.....
- Thread-2 8
- Thread-2 0
- write handler...
- write Thread-3 8
- in.....
- read handler.....
- Thread-4 9
- Thread-4 0
- write handler...
- write Thread-5 9
从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。
整个处理流程是服务预期的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。