1 线程间通信方式

1.1 共享内存

  因为线程与父进程的其他线程共享该进程所拥有的全部资源。所以创建的线程本来就已经实现共享内存。但要注意的是,在操作共享资源时要注意实现同步机制,确保线程安全

  例如,通过实现Runnable接口实现线程的共享变量:

package thread;public class RunnableTest {
    public static void main(String[] args) {
        RunnableThread runnableThread = new RunnableThread();        new Thread(runnableThread).start();        new Thread(runnableThread).start();        new Thread(runnableThread).start();
    }
}

class RunnableThread implements Runnable{    // 多个线程之间共享num
    int num = 0;    @Override
    synchronized public void run() {        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + ":" + num++);
        }
    }
}12345678910111213141516171819202122

也可以通过内部类的形式来共享变量:

package thread;/**
 * 通过内部类实现线程的共享变量
 */public class InnerShareThread {
    public static void main(String[] args) {
        MyThread mythread = new MyThread();
        mythread.getThread().start();
        mythread.getThread().start();
        mythread.getThread().start();
    }
}
class MyThread {    int num = 0;    private class InnerThread extends Thread {
        public synchronized void run() {            for (int i = 0; i < 100; i++) {
                System.out.println(Thread.currentThread().getName()
                        + ":" + num++);
            }
        }
    }    public Thread getThread() {        return new InnerThread();
    }
}12345678910111213141516171819202122232425262728

1.2 管道流

  管道流过程:生产者向管道中输出数据,消费者从管道中读取数据。当然,生产者的管道输出要与消费者的管道输入进行连接。

package thread;import java.io.IOException;import java.io.PipedInputStream;import java.io.PipedOutputStream;public class CommunicateWhitPiping {
    public static void main(String[] args) {        // 创建管道输出流
        PipedOutputStream pipedOutputStream = new PipedOutputStream();        // 创建管道输入流
        PipedInputStream pipedInputStream = new PipedInputStream();        try {            // 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
            pipedOutputStream.connect(pipedInputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }        // 创建生产者线程
        Producer p = new Producer(pipedOutputStream);        // 创建消费者线程
        Consumer c = new Consumer(pipedInputStream);        // 启动线程
        p.start();
        c.start();
    }
}/**
 * 生产者线程(与一个管道输出流相关联)
 */class Producer extends Thread {    private PipedOutputStream pos;    public Producer(PipedOutputStream pos) {        this.pos = pos;
    }    public void run() {        int i = 8;        try {
            pos.write(i);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}/**
 * 消费者线程(与一个管道输入流相关联)
 */class Consumer extends Thread {    private PipedInputStream pis;    public Consumer(PipedInputStream pis) {        this.pis = pis;
    }    public void run() {        try {
            System.out.println(pis.read());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566

2 Java中实现线程通信的方法

2.1 wait()、notify()、notifyAll()

  • wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()或notifyAll()方法来唤醒该线程。wait()可以带毫秒参数。调用wait()方法的当前线程会释放对该同步监视器的锁定。

  • notify():唤醒在此同步监视器上等待的单个线程。唤醒哪个线程是任意性的。

  • notifyAll():唤醒在此同步监视器上等待的所有线程。

(1)wait()、notify()、notifyAll()方法属于Object类,不属于Thread类 
(2)这个三个方法必须由同步监视器对象来调用: 
  ①对于synchronized修饰的同步方法,同步监视器是该类默认的实例(this),所以可以直接在同步方法中调用这个三个方法。 
  ②对于synchronized修饰的同步代码块,同步监视器是括号里的对象,所以需要使用该对象调用这个三个方法。

例子: 
同一个账户,有两个存款者和两个取款者,存款者和取款者不断地重复存钱和取钱动作,但不允许存款者连续两次存钱和取款者连续两次取钱。

package thread;/**
 * Created by Zen9 on 2016/3/16.
 */public class WaitAndNotifyTest {
    public static void main(String[] args) {        //创建一个账户,开始余额为0
        Account account = new Account(0);        new DrawThread("取款者A",account,800).start();        new DepositThread("存款者甲",account,800).start();        new DrawThread("取款者B",account,800).start();        new DepositThread("存款者乙",account,800).start();
    }
}//账户class Account{    private double balance;    // 标识账户中是否已有存款
    private boolean flag = false;    public Account(double balance){        this.balance = balance;
    }    synchronized public void draw(double drawAmount){        try {            if (!flag){                // 当前线程等待,释放该同步监视器的锁定
                wait();
            }            else {                //执行取款操作
                System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
                balance -= drawAmount;
                System.out.println("账户余额为:" + balance);                // 账户中存款被取走,将标志flag设为false
                flag = false;                // 唤醒其他线程
                notifyAll();
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }    synchronized public void deposit(double depositAmount){        try {            if (flag){
                wait();
            }            else {                //执行存款操作
                System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
                balance += depositAmount;
                System.out.println("账户余额为:" + balance);                // 账户中已有存款,将标志flag设为true
                flag = true;                // 唤醒其他线程
                notifyAll();
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

}//取钱线程class DrawThread extends Thread{    // 账户
    private Account account;    // 当前取钱线程希望去钱
    private double drawAmount;    public DrawThread(String name,Account account,double drawAmount){        super(name);        this.account = account;        this.drawAmount = drawAmount;
    }    @Override
    public void run() {        for (int i = 0; i < 100; i++) {
            account.draw(drawAmount);
        }
    }
}

class DepositThread extends Thread{    private Account account;    private double depositAmount;    public DepositThread(String name,Account account,double depositAmount){        super(name);        this.account = account;        this.depositAmount = depositAmount;
    }    @Override
    public void run() {        for (int i = 0; i < 100; i++) {
            account.deposit(depositAmount);
        }
    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103

这里写图片描述

程序最后阻塞,无法运行下去。因为当取款者或存款者其中一方的两者执行完200次循环后,剩下的另一方无法继续进行操作。 
注意:该种情况的程序阻塞不是死锁!

2.2 使用Condeition控制

  当程序使用Lock对象(如:ReentrantLock对象)来保证同步时,Java提供了一个Condition类来保持协调。Condition实例被绑定在一个Lock对象上。 
  在这种情况下,Lock替代了同步方法或同步代码块,Condition替代了同步监视器的功能。

改写上面Account类:

//账户class Account{    private double balance;    // 标识账户中是否已有存款
    private boolean flag = false;    private final Lock lock = new ReentrantLock();            /修改之处
    private final Condition condition = lock.newCondition();  /修改之处

    public Account(double balance){        this.balance = balance;
    }    public void draw(double drawAmount){
        lock.lock();       /修改之处
        try {            if (!flag){
                condition.await();      /修改之处
            }            else {                //执行取款操作
                System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
                balance -= drawAmount;
                System.out.println("账户余额为:" + balance);                // 账户中存款被取走,将标志flag设为false
                flag = false;                // 唤醒其他线程
                condition.signalAll();  /修改之处
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }    public void deposit(double depositAmount){
        lock.lock();       /修改之处
        try {            if (flag){
                condition.await();      /修改之处
            }            else {                //执行存款操作
                System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
                balance += depositAmount;
                System.out.println("账户余额为:" + balance);                // 账户中已有存款,将标志flag设为true
                flag = true;                // 唤醒其他线程
                condition.signalAll();  /修改之处
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455

对比两个Account类,此处只是显示地使用了Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒指定线程。

2.3 使用阻塞队列(BlockingQueue)

  BlockingQueue接口,是Queue的子接口,但它的主要作用不是作为容器,而是作为线程同步工具。 
  BlockingQueue具有一个特性:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。 
  BlockingQueue包含的方法:


抛出异常不同返回值阻塞线程指定超时时长
队尾插入元素add(e)offer(e)put(e)offer(e,time,unit)
队头删除元素remove()poll()take()poll(time,unit)
获取、不删除元素element()peek()

例子:

package thread;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/**
 * Created by Zen9 on 2016/3/16.
 */public class BlockingQueueTest {
    public static void main(String[] args) {        // 创建容量为1的BlockingQueue
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(1);        // 启动3个生产者线程
        new Producer(blockingQueue).start();        new Producer(blockingQueue).start();        new Producer(blockingQueue).start();        // 启动1个消费者线程
        new Consumer(blockingQueue).start();
    }
}

class Producer extends Thread{    private BlockingQueue<String> blockingQueue;    public Producer(BlockingQueue<String> blockingQueue){        this.blockingQueue = blockingQueue;
    }    @Override
    public void run() {
        String[] strArr = new String[]{"AAAA","BBBB","CCCC"};        for (int i = 0; i < 99999; i++) {
            System.out.println(getName() + " 生产者准备生产集合元素!");            try {
                Thread.sleep(200);                // 尝试放入元素,如果队列已满,则线程被阻塞
                blockingQueue.put(strArr[i%3]);
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println(getName() + "生产完成:" + blockingQueue);
        }
    }
}

class Consumer extends Thread{    private BlockingQueue<String> blockingQueue;    public Consumer(BlockingQueue<String> blockingQueue){        this.blockingQueue = blockingQueue;
    }    @Override
    public void run() {        while (true) {
            System.out.println(getName() + " 消费者准备消费集合元素!");            try {
                Thread.sleep(200);                // 尝试取出元素,如果队列已空,则线程被阻塞
                blockingQueue.take();
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println(getName() + "消费完成:" + blockingQueue);
        }
    }
}