1 线程间通信方式
1.1 共享内存
因为线程与父进程的其他线程共享该进程所拥有的全部资源。所以创建的线程本来就已经实现共享内存。但要注意的是,在操作共享资源时要注意实现同步机制,确保线程安全。
例如,通过实现Runnable接口实现线程的共享变量:
package thread;public class RunnableTest { public static void main(String[] args) { RunnableThread runnableThread = new RunnableThread(); new Thread(runnableThread).start(); new Thread(runnableThread).start(); new Thread(runnableThread).start(); } } class RunnableThread implements Runnable{ // 多个线程之间共享num int num = 0; @Override synchronized public void run() { for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + ":" + num++); } } }12345678910111213141516171819202122
也可以通过内部类的形式来共享变量:
package thread;/** * 通过内部类实现线程的共享变量 */public class InnerShareThread { public static void main(String[] args) { MyThread mythread = new MyThread(); mythread.getThread().start(); mythread.getThread().start(); mythread.getThread().start(); } } class MyThread { int num = 0; private class InnerThread extends Thread { public synchronized void run() { for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + ":" + num++); } } } public Thread getThread() { return new InnerThread(); } }12345678910111213141516171819202122232425262728
1.2 管道流
管道流过程:生产者向管道中输出数据,消费者从管道中读取数据。当然,生产者的管道输出要与消费者的管道输入进行连接。
package thread;import java.io.IOException;import java.io.PipedInputStream;import java.io.PipedOutputStream;public class CommunicateWhitPiping { public static void main(String[] args) { // 创建管道输出流 PipedOutputStream pipedOutputStream = new PipedOutputStream(); // 创建管道输入流 PipedInputStream pipedInputStream = new PipedInputStream(); try { // 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现 pipedOutputStream.connect(pipedInputStream); } catch (IOException e) { e.printStackTrace(); } // 创建生产者线程 Producer p = new Producer(pipedOutputStream); // 创建消费者线程 Consumer c = new Consumer(pipedInputStream); // 启动线程 p.start(); c.start(); } }/** * 生产者线程(与一个管道输出流相关联) */class Producer extends Thread { private PipedOutputStream pos; public Producer(PipedOutputStream pos) { this.pos = pos; } public void run() { int i = 8; try { pos.write(i); } catch (IOException e) { e.printStackTrace(); } } }/** * 消费者线程(与一个管道输入流相关联) */class Consumer extends Thread { private PipedInputStream pis; public Consumer(PipedInputStream pis) { this.pis = pis; } public void run() { try { System.out.println(pis.read()); } catch (IOException e) { e.printStackTrace(); } } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
2 Java中实现线程通信的方法
2.1 wait()、notify()、notifyAll()
wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()或notifyAll()方法来唤醒该线程。wait()可以带毫秒参数。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify():唤醒在此同步监视器上等待的单个线程。唤醒哪个线程是任意性的。
notifyAll():唤醒在此同步监视器上等待的所有线程。
(1)wait()、notify()、notifyAll()方法属于Object类,不属于Thread类
(2)这个三个方法必须由同步监视器对象来调用:
①对于synchronized修饰的同步方法,同步监视器是该类默认的实例(this),所以可以直接在同步方法中调用这个三个方法。
②对于synchronized修饰的同步代码块,同步监视器是括号里的对象,所以需要使用该对象调用这个三个方法。
例子:
同一个账户,有两个存款者和两个取款者,存款者和取款者不断地重复存钱和取钱动作,但不允许存款者连续两次存钱和取款者连续两次取钱。
package thread;/** * Created by Zen9 on 2016/3/16. */public class WaitAndNotifyTest { public static void main(String[] args) { //创建一个账户,开始余额为0 Account account = new Account(0); new DrawThread("取款者A",account,800).start(); new DepositThread("存款者甲",account,800).start(); new DrawThread("取款者B",account,800).start(); new DepositThread("存款者乙",account,800).start(); } }//账户class Account{ private double balance; // 标识账户中是否已有存款 private boolean flag = false; public Account(double balance){ this.balance = balance; } synchronized public void draw(double drawAmount){ try { if (!flag){ // 当前线程等待,释放该同步监视器的锁定 wait(); } else { //执行取款操作 System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); // 账户中存款被取走,将标志flag设为false flag = false; // 唤醒其他线程 notifyAll(); } }catch (InterruptedException e){ e.printStackTrace(); } } synchronized public void deposit(double depositAmount){ try { if (flag){ wait(); } else { //执行存款操作 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); // 账户中已有存款,将标志flag设为true flag = true; // 唤醒其他线程 notifyAll(); } }catch (InterruptedException e){ e.printStackTrace(); } } }//取钱线程class DrawThread extends Thread{ // 账户 private Account account; // 当前取钱线程希望去钱 private double drawAmount; public DrawThread(String name,Account account,double drawAmount){ super(name); this.account = account; this.drawAmount = drawAmount; } @Override public void run() { for (int i = 0; i < 100; i++) { account.draw(drawAmount); } } } class DepositThread extends Thread{ private Account account; private double depositAmount; public DepositThread(String name,Account account,double depositAmount){ super(name); this.account = account; this.depositAmount = depositAmount; } @Override public void run() { for (int i = 0; i < 100; i++) { account.deposit(depositAmount); } } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
程序最后阻塞,无法运行下去。因为当取款者或存款者其中一方的两者执行完200次循环后,剩下的另一方无法继续进行操作。
注意:该种情况的程序阻塞不是死锁!
2.2 使用Condeition控制
当程序使用Lock对象(如:ReentrantLock对象)来保证同步时,Java提供了一个Condition类来保持协调。Condition实例被绑定在一个Lock对象上。
在这种情况下,Lock替代了同步方法或同步代码块,Condition替代了同步监视器的功能。
改写上面Account类:
//账户class Account{ private double balance; // 标识账户中是否已有存款 private boolean flag = false; private final Lock lock = new ReentrantLock(); /修改之处 private final Condition condition = lock.newCondition(); /修改之处 public Account(double balance){ this.balance = balance; } public void draw(double drawAmount){ lock.lock(); /修改之处 try { if (!flag){ condition.await(); /修改之处 } else { //执行取款操作 System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); // 账户中存款被取走,将标志flag设为false flag = false; // 唤醒其他线程 condition.signalAll(); /修改之处 } }catch (InterruptedException e){ e.printStackTrace(); } } public void deposit(double depositAmount){ lock.lock(); /修改之处 try { if (flag){ condition.await(); /修改之处 } else { //执行存款操作 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); // 账户中已有存款,将标志flag设为true flag = true; // 唤醒其他线程 condition.signalAll(); /修改之处 } }catch (InterruptedException e){ e.printStackTrace(); } } }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
对比两个Account类,此处只是显示地使用了Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒指定线程。
2.3 使用阻塞队列(BlockingQueue)
BlockingQueue接口,是Queue的子接口,但它的主要作用不是作为容器,而是作为线程同步工具。
BlockingQueue具有一个特性:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。
BlockingQueue包含的方法:
抛出异常 | 不同返回值 | 阻塞线程 | 指定超时时长 | |
---|---|---|---|---|
队尾插入元素 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
队头删除元素 | remove() | poll() | take() | poll(time,unit) |
获取、不删除元素 | element() | peek() | 无 | 无 |
例子:
package thread;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/** * Created by Zen9 on 2016/3/16. */public class BlockingQueueTest { public static void main(String[] args) { // 创建容量为1的BlockingQueue BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(1); // 启动3个生产者线程 new Producer(blockingQueue).start(); new Producer(blockingQueue).start(); new Producer(blockingQueue).start(); // 启动1个消费者线程 new Consumer(blockingQueue).start(); } } class Producer extends Thread{ private BlockingQueue<String> blockingQueue; public Producer(BlockingQueue<String> blockingQueue){ this.blockingQueue = blockingQueue; } @Override public void run() { String[] strArr = new String[]{"AAAA","BBBB","CCCC"}; for (int i = 0; i < 99999; i++) { System.out.println(getName() + " 生产者准备生产集合元素!"); try { Thread.sleep(200); // 尝试放入元素,如果队列已满,则线程被阻塞 blockingQueue.put(strArr[i%3]); }catch (Exception e){ e.printStackTrace(); } System.out.println(getName() + "生产完成:" + blockingQueue); } } } class Consumer extends Thread{ private BlockingQueue<String> blockingQueue; public Consumer(BlockingQueue<String> blockingQueue){ this.blockingQueue = blockingQueue; } @Override public void run() { while (true) { System.out.println(getName() + " 消费者准备消费集合元素!"); try { Thread.sleep(200); // 尝试取出元素,如果队列已空,则线程被阻塞 blockingQueue.take(); }catch (Exception e){ e.printStackTrace(); } System.out.println(getName() + "消费完成:" + blockingQueue); } } }