当前位置:   article > 正文

【Java并发编程】管道输入输出流PipedReader/PipedWriter_pipe not connected

pipe not connected

线程之间通信的三种方式:

(1)共享内存。

(2)wait()/notify()方法。

(3)管道输入输出流。

本篇将介绍第三种方式,使用管道输入输出流进行线程间的通信。

管道输入/输出流的思想:

Java里的管道输入流PipedWriter与管道输出流PipedReader实现了类似管道的功能,用于不同线程之间的相互通信。

java的管道输入与输出实际上使用的是一个循环缓冲数组来实现,这个数组默认大小为1024字节。读线程使用PipedReader从这个循环缓冲数组中读数据,写线程使用PipedWriter往这个循环缓冲数组中写入数据。

(1)当这个缓冲数组已满的时候,写线程将阻塞;

(2)当这个缓冲数组为空的时候,读线程将阻塞;

这其实就是生产者消费者模式的实现。

基本流程如下图所示:

从上图中可以看出,最关键的地方就是对PipedReader中的缓冲区的控制。

下面是PipedReader的成员变量:

  1. //缓冲区默认的大小
  2. private static final int DEFAULT_PIPE_SIZE = 1024;
  3. //缓冲区
  4. char buffer[];
  5. //写线程将要写入的字符数据在缓冲区的索引位置,in<0表示缓冲区为空,in==out表示缓冲区满
  6. int in = -1;
  7. //读线程将要读取的字符数据在缓冲区的索引位置。
  8. int out = 0;

成员变量in用来控制写线程写入缓冲区的位置,顺序自增。而out控制读线程读取缓冲区数据的索引位置,顺序自增。 

缓冲区的状态:

 (1)缓冲区为空

缓冲区为空包括从未有写线程向缓冲区输入数据,还有一种情况即读线程将写线程写的数据都读取完毕,这两种情况都是缓冲区为空的状态。如下图所示:

 当缓冲区为空的时候,in=1,out=0,此时需要阻塞读线程,通知写线程继续向缓冲区写入数据。

(2)缓冲区满

由于使用的缓冲区是用循环数组实现的,即写线程写到数组的末尾时可以从索引0开始继续写数据,直到遇到读线程的索引位置out,此时缓冲区满了。如下图所示:

当缓冲区满了,就需要阻塞写线程,唤醒读线程继续读取数据。

源码分析

PipedReader的read方法。 

  1. public synchronized int read() throws IOException {
  2. //1.是否建立管道连接
  3. if (!connected) {
  4. throw new IOException("Pipe not connected");
  5. } else if (closedByReader) {
  6. //2.输出管道是否关闭,调用close()方法关闭输出管道
  7. throw new IOException("Pipe closed");
  8. } else if (writeSide != null && !writeSide.isAlive()
  9. && !closedByWriter && (in < 0)) {
  10. //3.判断写线程是否还存活
  11. throw new IOException("Write end dead");
  12. }
  13. //4.获得当前读线程的引用
  14. readSide = Thread.currentThread();
  15. int trials = 2;
  16. while (in < 0) {
  17. //5.如果缓冲区为空
  18. if (closedByWriter) {
  19. /* 写管道关闭 ,返回-1*/
  20. return -1;
  21. }
  22. if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  23. throw new IOException("Pipe broken");
  24. }
  25. /* might be a writer waiting */
  26. //6.唤醒写线程继续向缓冲区写数据
  27. notifyAll();
  28. try {
  29. //7.释放锁,将读线程加入到等待队列
  30. wait(1000);
  31. } catch (InterruptedException ex) {
  32. throw new java.io.InterruptedIOException();
  33. }
  34. }
  35. //8.执行读取操作
  36. int ret = buffer[out++];
  37. //9.如果读取索引位置大于等于缓冲区最大容量,从头读取
  38. if (out >= buffer.length) {
  39. out = 0;
  40. }
  41. //10.如果读取完所有的缓冲数据,缓冲区为空,就重置读取索引位置。
  42. if (in == out) {
  43. /* now empty */
  44. in = -1;
  45. }
  46. return ret;
  47. }

PipedWriter的write()方法

  1. public void write(int c) throws IOException {
  2. //1.PipedReader sink
  3. if (sink == null) {
  4. throw new IOException("Pipe not connected");
  5. }
  6. //2.使用管道输入流PipedReader的receive方法向缓冲区输入数据。
  7. sink.receive(c);
  8. }

PipedReader的receive方法

  1. synchronized void receive(int c) throws IOException {
  2. //1.管道是否连接
  3. if (!connected) {
  4. throw new IOException("Pipe not connected");
  5. } else if (closedByWriter || closedByReader) {
  6. //2.管道是否关闭
  7. throw new IOException("Pipe closed");
  8. } else if (readSide != null && !readSide.isAlive()) {
  9. //3.读线程是否存活
  10. throw new IOException("Read end dead");
  11. }
  12. //4.获取当前写线程的引用
  13. writeSide = Thread.currentThread();
  14. while (in == out) {
  15. //5.缓冲区满
  16. if ((readSide != null) && !readSide.isAlive()) {
  17. throw new IOException("Pipe broken");
  18. }
  19. /* full: kick any waiting readers */
  20. //6.唤醒读线程读取
  21. notifyAll();
  22. try {
  23. //7.将写线程阻塞
  24. wait(1000);
  25. } catch (InterruptedException ex) {
  26. throw new java.io.InterruptedIOException();
  27. }
  28. }
  29. //8.如果缓冲区为空,重置in和out
  30. if (in < 0) {
  31. in = 0;
  32. out = 0;
  33. }
  34. //9.写入数据
  35. buffer[in++] = (char) c;
  36. //10。如果写入索引位置大于等于缓冲区最大容量,从头写入
  37. if (in >= buffer.length) {
  38. in = 0;
  39. }
  40. }

管道输出/输入流实现线程通信

下面给出使用PipedReader和PipedWriter实现线程之间通信的代码:

  1. public class Test {
  2. public static void main(String[] args) throws IOException {
  3. PipedWriter out=new PipedWriter();
  4. PipedReader in=new PipedReader();
  5. out.connect(in);
  6. new Thread(new Reader(in)).start();
  7. new Thread(new Writer(out)).start();
  8. }
  9. }
  10. class Reader implements Runnable{
  11. PipedReader pd;
  12. public Reader(PipedReader pd){
  13. this.pd=pd;
  14. }
  15. @Override
  16. public void run() {
  17. // TODO Auto-generated method stub
  18. int rs=0;
  19. try {
  20. while((rs=pd.read())!=-1){
  21. System.out.print((char)rs);
  22. }
  23. } catch (IOException e) {
  24. // TODO Auto-generated catch block
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. class Writer implements Runnable{
  30. PipedWriter pw;
  31. public Writer(PipedWriter pw){
  32. this.pw=pw;
  33. }
  34. @Override
  35. public void run() {
  36. // TODO Auto-generated method stub
  37. int rs=0;
  38. try {
  39. while((rs=System.in.read())!=-1){
  40. pw.write(rs);
  41. }
  42. } catch (IOException e) {
  43. // TODO Auto-generated catch block
  44. e.printStackTrace();
  45. }
  46. }
  47. }

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号