赞
踩
线程之间通信的三种方式:
(1)共享内存。
(2)wait()/notify()方法。
(3)管道输入输出流。
本篇将介绍第三种方式,使用管道输入输出流进行线程间的通信。
管道输入/输出流的思想:
Java里的管道输入流PipedWriter与管道输出流PipedReader实现了类似管道的功能,用于不同线程之间的相互通信。
java的管道输入与输出实际上使用的是一个循环缓冲数组来实现,这个数组默认大小为1024字节。读线程使用PipedReader从这个循环缓冲数组中读数据,写线程使用PipedWriter往这个循环缓冲数组中写入数据。
(1)当这个缓冲数组已满的时候,写线程将阻塞;
(2)当这个缓冲数组为空的时候,读线程将阻塞;
这其实就是生产者消费者模式的实现。
基本流程如下图所示:
从上图中可以看出,最关键的地方就是对PipedReader中的缓冲区的控制。
下面是PipedReader的成员变量:
- //缓冲区默认的大小
- private static final int DEFAULT_PIPE_SIZE = 1024;
-
- //缓冲区
- char buffer[];
-
- //写线程将要写入的字符数据在缓冲区的索引位置,in<0表示缓冲区为空,in==out表示缓冲区满
- int in = -1;
-
- //读线程将要读取的字符数据在缓冲区的索引位置。
- int out = 0;
成员变量in用来控制写线程写入缓冲区的位置,顺序自增。而out控制读线程读取缓冲区数据的索引位置,顺序自增。
缓冲区的状态:
(1)缓冲区为空
缓冲区为空包括从未有写线程向缓冲区输入数据,还有一种情况即读线程将写线程写的数据都读取完毕,这两种情况都是缓冲区为空的状态。如下图所示:
当缓冲区为空的时候,in=1,out=0,此时需要阻塞读线程,通知写线程继续向缓冲区写入数据。
(2)缓冲区满
由于使用的缓冲区是用循环数组实现的,即写线程写到数组的末尾时可以从索引0开始继续写数据,直到遇到读线程的索引位置out,此时缓冲区满了。如下图所示:
当缓冲区满了,就需要阻塞写线程,唤醒读线程继续读取数据。
源码分析
PipedReader的read方法。
- public synchronized int read() throws IOException {
- //1.是否建立管道连接
- if (!connected) {
- throw new IOException("Pipe not connected");
- } else if (closedByReader) {
- //2.输出管道是否关闭,调用close()方法关闭输出管道
- throw new IOException("Pipe closed");
- } else if (writeSide != null && !writeSide.isAlive()
- && !closedByWriter && (in < 0)) {
- //3.判断写线程是否还存活
- throw new IOException("Write end dead");
- }
- //4.获得当前读线程的引用
- readSide = Thread.currentThread();
- int trials = 2;
- while (in < 0) {
- //5.如果缓冲区为空
- if (closedByWriter) {
- /* 写管道关闭 ,返回-1*/
- return -1;
- }
- if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
- throw new IOException("Pipe broken");
- }
- /* might be a writer waiting */
- //6.唤醒写线程继续向缓冲区写数据
- notifyAll();
- try {
- //7.释放锁,将读线程加入到等待队列
- wait(1000);
- } catch (InterruptedException ex) {
- throw new java.io.InterruptedIOException();
- }
- }
- //8.执行读取操作
- int ret = buffer[out++];
- //9.如果读取索引位置大于等于缓冲区最大容量,从头读取
- if (out >= buffer.length) {
- out = 0;
- }
- //10.如果读取完所有的缓冲数据,缓冲区为空,就重置读取索引位置。
- if (in == out) {
- /* now empty */
- in = -1;
- }
- return ret;
- }
PipedWriter的write()方法
- public void write(int c) throws IOException {
- //1.PipedReader sink
- if (sink == null) {
- throw new IOException("Pipe not connected");
- }
- //2.使用管道输入流PipedReader的receive方法向缓冲区输入数据。
- sink.receive(c);
- }
PipedReader的receive方法
- synchronized void receive(int c) throws IOException {
- //1.管道是否连接
- if (!connected) {
- throw new IOException("Pipe not connected");
- } else if (closedByWriter || closedByReader) {
- //2.管道是否关闭
- throw new IOException("Pipe closed");
- } else if (readSide != null && !readSide.isAlive()) {
- //3.读线程是否存活
- throw new IOException("Read end dead");
- }
- //4.获取当前写线程的引用
- writeSide = Thread.currentThread();
- while (in == out) {
- //5.缓冲区满
- if ((readSide != null) && !readSide.isAlive()) {
- throw new IOException("Pipe broken");
- }
- /* full: kick any waiting readers */
- //6.唤醒读线程读取
- notifyAll();
- try {
- //7.将写线程阻塞
- wait(1000);
- } catch (InterruptedException ex) {
- throw new java.io.InterruptedIOException();
- }
- }
- //8.如果缓冲区为空,重置in和out
- if (in < 0) {
- in = 0;
- out = 0;
- }
- //9.写入数据
- buffer[in++] = (char) c;
- //10。如果写入索引位置大于等于缓冲区最大容量,从头写入
- if (in >= buffer.length) {
- in = 0;
- }
- }
管道输出/输入流实现线程通信
下面给出使用PipedReader和PipedWriter实现线程之间通信的代码:
- public class Test {
-
- public static void main(String[] args) throws IOException {
- PipedWriter out=new PipedWriter();
- PipedReader in=new PipedReader();
- out.connect(in);
- new Thread(new Reader(in)).start();
- new Thread(new Writer(out)).start();
-
- }
- }
- class Reader implements Runnable{
- PipedReader pd;
- public Reader(PipedReader pd){
- this.pd=pd;
- }
- @Override
- public void run() {
- // TODO Auto-generated method stub
- int rs=0;
- try {
- while((rs=pd.read())!=-1){
- System.out.print((char)rs);
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
-
- class Writer implements Runnable{
- PipedWriter pw;
- public Writer(PipedWriter pw){
- this.pw=pw;
- }
- @Override
- public void run() {
- // TODO Auto-generated method stub
- int rs=0;
- try {
- while((rs=System.in.read())!=-1){
- pw.write(rs);
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。