赞
踩
同步异步 , 举个例子来说,一家餐厅吧来了5个客人,同步的意思就是说,来第一个点菜,点了个鱼,好, 厨师去捉鱼杀鱼,过了半小时鱼好了给第一位客人,开始下位一位客人,就这样一个一个来,按顺序来
相同, 异步呢,异步的意思就是来第一位客人,点什么,点鱼,给它一个牌子,让他去一边等吧,下一位客人接着点菜,点完接着点让厨师做去吧,哪个的菜先好就先端出来,
同步的优点是:同步是按照顺序一个一个来,不会乱掉,更不会出现上面代码没有执行完就执行下面的代码, 缺点:是解析的速度没有异步的快;
异步的优点是:异步是接取一个任务,直接给后台,在接下一个任务,一直一直这样,谁的先读取完先执行谁的, 缺点:没有顺序 ,谁先读取完先执行谁的 ,会出现上面的代码还没出来下面的就已经出来了,会报错;
即:Guraded Suspension,用在一个线程等待另一个线程的执行结果
注意:
一个简单的一个线程下载,另一个线程输出结果的例子
package com.example.demo; import com.example.tools.Downloader; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.List; /** * @author 我见青山多妩媚 * @date Create on 2022/6/28 16:00 */ @Slf4j public class Demo20 { //线程1等待线程2的下载结果 public static void main(String[] args) { GuardeObject guardeObject = new GuardeObject(); new Thread(()->{ //等待结果 log.debug("等待结果"); List<String> list = (List<String>)guardeObject.get(); log.debug("结果大小为:{}",list.size()); },"线程1").start(); new Thread(()->{ log.debug("执行下载"); try { //执行下载 List<String> download = Downloader.download(); guardeObject.complete(download); } catch (IOException e) { e.printStackTrace(); } },"线程2").start(); } } class GuardeObject{ //结果 private Object response; //获取结果 public Object get(){ synchronized (this){ while (response == null){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return response; } //产生结果 public void complete(Object response){ synchronized (this){ this.response = response; this.notifyAll(); } } }
Downloader
类
package com.example.tools; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; /** * @author 我见青山多妩媚 * @date Create on 2022/6/28 16:06 */ public class Downloader { public static List<String> download() throws IOException { HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection(); List<String> list = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), StandardCharsets.UTF_8))){ String line; while ((line = reader.readLine())!=null){ list.add(line); } } return list; } }
运行结果:
16:17:18.034 [线程1] DEBUG com.example.demo.Demo20 - 等待结果
16:17:18.034 [线程2] DEBUG com.example.demo.Demo20 - 执行下载
16:17:19.466 [线程1] DEBUG com.example.demo.Demo20 - 结果大小为:3
或
16:19:52.222 [线程2] DEBUG com.example.demo.Demo20 - 执行下载
16:19:52.222 [线程1] DEBUG com.example.demo.Demo20 - 等待结果
16:19:53.747 [线程1] DEBUG com.example.demo.Demo20 - 结果大小为:3
具体流程:如果线程1先执行,那么被执行等待,线程2下载完成之后被唤醒,执行线程1(或直接执行线程2,之后执行线程1)
让一个线程只等待一段时间,超时不等待了
package com.example.demo; import com.example.tools.Downloader; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.List; /** * @author 我见青山多妩媚 * @date Create on 2022/6/28 16:00 */ @Slf4j public class Demo20 { //线程1等待线程2的下载结果 public static void main(String[] args) { GuardeObject guardeObject = new GuardeObject(); //等待结果 new Thread(()->{ //等待结果 log.debug("等待结果"); List<String> list = (List<String>) guardeObject.get(2000l); log.debug("结果大小为:{}",list.size()); },"线程1").start(); //产生结果 new Thread(()->{ try { //手动睡眠0.5s Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("执行下载"); try { List<String> download = Downloader.download(); guardeObject.complete(download); } catch (IOException e) { e.printStackTrace(); } },"线程2").start(); } } class GuardeObject{ //结果 private Object response; //获取结果 //设置最大等待时间 public Object get(Long timeout){ synchronized (this){ //开始时间 long begin = System.currentTimeMillis(); long end = 0; while (response == null){ //如果超过最大等待时间,那么退出循环 end - timout >= 0 long waitTime = timeout - end; if(waitTime<=0){ break; } try { //不能等待timeout,因为不会退出,仍然会继续等待,还要考虑虚假唤醒的情况,再次进入时等待的时间变长 this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } //经历时间 end = System.currentTimeMillis() - begin; } } return response; } //产生结果 public void complete(Object response){ synchronized (this){ this.response = response; this.notifyAll(); } } }
运行结果:
17:07:38.233 [线程1] DEBUG com.example.demo.Demo20 - 等待结果
17:07:38.744 [线程2] DEBUG com.example.demo.Demo20 - 执行下载
17:07:40.119 [线程1] DEBUG com.example.demo.Demo20 - 结果大小为:3
当让程序sleep(1000)时,因为达到超时时间,运行结果为:
17:12:06.242 [线程1] DEBUG com.example.demo.Demo20 - 等待结果
17:12:07.251 [线程2] DEBUG com.example.demo.Demo20 - 执行下载
Exception in thread "线程1" java.lang.NullPointerException
at com.example.demo.Demo20.lambda$main$0(Demo20.java:23)
at java.lang.Thread.run(Thread.java:748)
因为达到了最大超时时间,所以list结果实际为空,list.size()自然会报错
虚假唤醒的情况
package com.example.demo; import com.example.tools.Downloader; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.List; /** * @author 我见青山多妩媚 * @date Create on 2022/6/28 16:00 */ @Slf4j public class Demo20 { //线程1等待线程2的下载结果 public static void main(String[] args) { GuardeObject guardeObject = new GuardeObject(); //等待结果 new Thread(()->{ //等待结果 log.debug("等待结果"); Object o = guardeObject.get(2000l); log.debug("结果大小为:{}",o); },"线程1").start(); //产生结果 new Thread(()->{ try { //手动睡眠1s Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("执行下载"); guardeObject.complete(null); },"线程2").start(); } } class GuardeObject{ //结果 private Object response; //获取结果 //设置最大等待时间 public Object get(Long timeout){ synchronized (this){ //开始时间 long begin = System.currentTimeMillis(); long end = 0; while (response == null){ //如果超过最大等待时间,那么退出循环 end - timout >= 0 long waitTime = timeout - end; if(waitTime<=0){ break; } try { //虚假唤醒的情况,此时设置为timeout this.wait(timeout); } catch (InterruptedException e) { e.printStackTrace(); } //经历时间 end = System.currentTimeMillis() - begin; } } return response; } //产生结果 public void complete(Object response){ synchronized (this){ this.response = response; this.notifyAll(); } } }
运行结果:
17:15:39.040 [线程1] DEBUG com.example.demo.Demo20 - 等待结果
17:15:40.037 [线程2] DEBUG com.example.demo.Demo20 - 执行下载
17:15:42.049 [线程1] DEBUG com.example.demo.Demo20 - 结果大小为:null
对虚假唤醒和正常唤醒的解释:
可以看到,最后的结果比一开始定的2s多了1s,就是因为等待时间为timeout,当程序第一次运动到wait时,开始等待2s,程序运行线程2时,等待1s后唤醒了线程1,所以线程1实际等待了1s就被唤醒了,因为线程2赋值的为null,所以线程1继续等待2s,等待2s之后,再去循环后,当前时间已经等待了3s,比规定的2s大,所以退出循环,线程结束。
当使用watiTime时,线程1运行到wait,等待2s,去运行线程2,线程1等待1s被唤醒,此时waitTime=2-1s=1s,所以又等待1s后退出循环,线程结束
join的底层实际就是使用了保护性暂停模式
邮递员送信
package com.example.demo; import lombok.extern.slf4j.Slf4j; import java.util.Hashtable; import java.util.Map; import java.util.Set; /** * @author 我见青山多妩媚 * @date Create on 2022/6/28 16:00 */ @Slf4j public class Demo20 { public static void main(String[] args) { for (int i = 0; i < 3; i++) { //居民线程 new People().start(); } //休息1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //给几个人去送信 三个人要信三个邮递员送信 for (Integer id : Mailboxes.getIds()) { new Postman(id,"内容"+id).start(); } } } //每个人或者每个邮递员 都是一个线程 @Slf4j(topic = "people") class People extends Thread{ @Override public void run() { GuardedObject guardeObject = Mailboxes.createGuardedObject(); log.debug("收信id:{}",guardeObject.getId()); Object mail = guardeObject.get(5000l); log.debug("收信id:{},收到信:{}",guardeObject.getId(),mail); } } @Slf4j(topic = "postman") class Postman extends Thread{ private int mailId; private Object mail; public Postman(int mailId, Object mail) { this.mailId = mailId; this.mail = mail; } @Override public void run() { //获取邮件内容,返回给收信者 GuardedObject guardedObject = Mailboxes.getGuardedObject(mailId); log.debug("送信id:{},内容:{}",mailId,mail); guardedObject.complete(mail); } } class Mailboxes{ //hashtable 确保多线程环境下运行 信箱 private static Map<Integer,GuardedObject> boxes = new Hashtable<>(); private static int id = 1; private static synchronized int generateId(){ return id++; } public static GuardedObject createGuardedObject(){ GuardedObject guardeObject = new GuardedObject(generateId()); boxes.put(guardeObject.getId(),guardeObject); return guardeObject; } //返回所有的id public static Set<Integer> getIds(){ return boxes.keySet(); } //根据id获取对于的object,人拿到信之后,移除信箱 public static GuardedObject getGuardedObject(Integer id){ return boxes.remove(id); } } class GuardedObject{ private Integer id; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public GuardedObject() { } public GuardedObject(Integer id) { this.id = id; } //结果 private Object response; //获取结果 //设置最大等待时间 public Object get(Long timeout){ synchronized (this){ //开始时间 long begin = System.currentTimeMillis(); long end = 0; while (response == null){ //如果超过最大等待时间,那么退出循环 end - timout >= 0 long waitTime = timeout - end; if(waitTime<=0){ break; } try { //不能等待timeout,因为不会退出,仍然会继续等待,还要考虑虚假唤醒的情况,再次进入时等待的时间变长 this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } //经历时间 end = System.currentTimeMillis() - begin; } } return response; } //产生结果 public void complete(Object response){ synchronized (this){ this.response = response; this.notifyAll(); } } }
执行结果:
17:18:03.942 [Thread-0] DEBUG people - 收信id:1
17:18:03.942 [Thread-1] DEBUG people - 收信id:2
17:18:03.942 [Thread-2] DEBUG people - 收信id:3
17:18:04.950 [Thread-5] DEBUG postman - 送信id:1,内容:内容1
17:18:04.950 [Thread-3] DEBUG postman - 送信id:3,内容:内容3
17:18:04.950 [Thread-4] DEBUG postman - 送信id:2,内容:内容2
17:18:04.950 [Thread-0] DEBUG people - 收信id:1,收到信:内容1
17:18:04.950 [Thread-2] DEBUG people - 收信id:3,收到信:内容3
17:18:04.950 [Thread-1] DEBUG people - 收信id:2,收到信:内容2
生产者生产,消费者消费的一个简单例子:
生产者不断生产,生产数量小于等于消息队列的长度(capacity),消费者不断消费,直到生产者不在生产
package com.example.MessageQueue; import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; /** * @author 我见青山多妩媚 * @date Create on 2022/7/2 15:52 */ @Slf4j(topic = "demo") public class Demo21 { public static void main(String[] args) { MessageQueue messageQueue = new MessageQueue(2); //生产者 for (int i = 0; i < 3; i++) { int id = i; new Thread(()->{ messageQueue.put(new Message(id,"值"+id)); },"生产者"+i+"号").start(); } new Thread(()->{ while (true){ try { //等待1s Thread.sleep(1000); Message take = messageQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者").start(); } } @Slf4j(topic = "MessageQueue") class MessageQueue{ //消息集合 private final LinkedList<Message> list = new LinkedList<>(); //队列容量 private final int capacity; public MessageQueue(int capacity) { this.capacity = capacity; } //获取消息 public Message take(){ //检查队列是否为空 synchronized (list){ //如果为空,持续等待 while (list.isEmpty()){ try { log.debug("队列为空,消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("消费者消费"); //从队列头部获取消息返回 Message message = list.removeFirst(); //取走之后,唤醒等待的加入线程 list.notifyAll(); return message; } } //存入消息 public void put(Message message){ synchronized (list){ //如果队列满了 while (list.size() == capacity){ try { log.debug("队列已满,生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("生产者生产{}",message.toString()); //如果有空位,加入队列尾部 list.addLast(message); //加入之后,唤醒等待的线程 list.notifyAll(); } } } @Slf4j(topic = "message") final class Message{ //确保不线程安全,不加入set方法 private final int id; private final Object message; public int getId() { return id; } public Object getMessage() { return message; } public Message(int id, Object message) { this.id = id; this.message = message; } @Override public String toString() { return "Message{" + "id=" + id + ", message=" + message + '}'; } }
运行结果:
16:22:58.446 [生产者0号] DEBUG MessageQueue - 生产者生产Message{id=0, message=值0}
16:22:58.449 [生产者2号] DEBUG MessageQueue - 生产者生产Message{id=2, message=值2}
16:22:58.449 [生产者1号] DEBUG MessageQueue - 队列已满,生产者线程等待
16:22:59.453 [消费者] DEBUG MessageQueue - 消费者消费
16:22:59.453 [生产者1号] DEBUG MessageQueue - 生产者生产Message{id=1, message=值1}
16:23:00.468 [消费者] DEBUG MessageQueue - 消费者消费
16:23:01.481 [消费者] DEBUG MessageQueue - 消费者消费
16:23:02.488 [消费者] DEBUG MessageQueue - 队列为空,消费者线程等待
可以看到,定义的capacity=2,生产者线程数为3,所以生产两个消息之后生产者线程等待,消费者线程进行消费,消费之后唤醒等待的生产者线程,生产者继续生产,消费者继续消费,直到队列为空
比如先打印2之后才打印1
wait notify版
无论怎样,都是先打印出2,再打印出1
package com.example.MessageQueue; import lombok.extern.slf4j.Slf4j; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:05 */ @Slf4j public class Demo31 { static final Object lock = new Object(); //表示2线程是否打印过 static boolean isLock = false; public static void main(String[] args) { Thread t1 = new Thread(()->{ synchronized (lock){ while (!isLock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("1"); } },"t1"); Thread t2 = new Thread(()->{ synchronized (lock){ log.debug("2"); isLock = true; lock.notify(); } },"t2"); t1.start(); t2.start(); } }
运行结果:
18:12:44.364 [t2] DEBUG com.example.MessageQueue.Demo31 - 2
18:12:44.370 [t1] DEBUG com.example.MessageQueue.Demo31 - 1
ReentrantLock await & signal
package com.example.MessageQueue; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:13 */ @Slf4j public class Demo32 { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); //表示2线程是否打印过 static boolean isLock = false; public static void main(String[] args) { Thread t1 = new Thread(()->{ lock.lock(); try { while (!isLock){ try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("1"); }finally { lock.unlock(); } },"t1"); Thread t2 = new Thread(()->{ lock.lock(); try { log.debug("2"); isLock = true; condition.signal(); } finally { lock.unlock(); } },"t2"); t1.start(); t2.start(); } }
运行结果:
18:16:35.735 [t2] DEBUG com.example.MessageQueue.Demo32 - 2
18:16:35.738 [t1] DEBUG com.example.MessageQueue.Demo32 - 1
park & unpark
package com.example.MessageQueue; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.LockSupport; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:17 */ @Slf4j public class Demo33 { public static void main(String[] args) { Thread t1 = new Thread(()->{ //锁住 LockSupport.park(); log.debug("1"); },"t1"); Thread t2 = new Thread(()->{ log.debug("2"); //唤醒阻塞的t1线程 LockSupport.unpark(t1); },"t2"); t1.start(); t2.start(); } }
运行结果:
18:19:09.582 [t2] DEBUG com.example.MessageQueue.Demo33 - 2
18:19:09.585 [t1] DEBUG com.example.MessageQueue.Demo33 - 1
要求:
线程1输出’a’五次,线程2输出’b’五次,线程3输出’c’五次。现在要求输出abcabcabcabcabc该怎么实现?
wait & notify
package com.example.DesignModule; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:21 */ @Slf4j public class Demo34 { /** * 输出内容 等待标记 下一个标记 * a 1 2 * b 2 3 * c 3 1 */ public static void main(String[] args) { WaitNotify waitNotify = new WaitNotify(1,5); new Thread(()->{ waitNotify.print("a",1,2); }).start(); new Thread(()->{ waitNotify.print("b",2,3); }).start(); new Thread(()->{ waitNotify.print("c",3,1); }).start(); } } @Data @AllArgsConstructor class WaitNotify { //等待标记 private int flag; //1、2、3 1-->2 , 2-->3 , 3-->1 //循环次数 private int loopNumber; //打印 [a 1 2] [b 2 3] [c 3 1] public void print(String str,int waitFlag,int nextFlag){ for (int i = 0; i < loopNumber; i++) { synchronized (this){ //当前线程和应该执行的比较 while (flag != waitFlag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); //给下一个线程 flag = nextFlag; //叫醒其他线程 this.notifyAll(); } } } }
运行结果:
abcabcabcabcabc
await & signal
有些不严谨,因为没用while去解决虚假唤醒,但是仅仅针对于本例子来说没问题
package com.example.DesignModule; import com.example.tools.Sleep; import lombok.AllArgsConstructor; import lombok.Data; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:36 */ public class Demo35 { public static void main(String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5); //三间休息室 Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); new Thread(()->{ awaitSignal.print("a",a,b); }).start(); new Thread(()->{ awaitSignal.print("b",b,c); }).start(); new Thread(()->{ awaitSignal.print("c",c,a); }).start(); Sleep.sleep(1); //因为进去之后都休息了,所以需要一个线程去唤醒a休息室的线程 awaitSignal.lock(); try { System.out.println("主线程发起开始命令"); a.signal(); } finally { awaitSignal.unlock(); } } } @AllArgsConstructor @Data class AwaitSignal extends ReentrantLock { //循环次数 private int loopNumber; /** * * @param str 内容 * @param condition 进入那一间去打印 * @param nextCondition 下一个休息室 */ public void print(String str,Condition condition,Condition nextCondition){ for (int i = 0; i < loopNumber; i++) { lock(); //相当于this.lock(); try{ try { condition.await(); System.out.print(str); //唤醒下一间休息室 nextCondition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } }finally { unlock(); } } } }
运行结果:
主线程发起开始命令
abcabcabcabcabc
park * unpark
package com.example.DesignModule; import com.example.tools.Sleep; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.locks.LockSupport; /** * @author 我见青山多妩媚 * @date Create on 2022/7/5 18:46 */ @Slf4j public class Demo36 { static Thread t1,t2,t3; public static void main(String[] args) { ParkUnPark parkUnPark = new ParkUnPark(5); t1 = new Thread(()->{ parkUnPark.print("a",t2); }); t2 = new Thread(()->{ parkUnPark.print("b",t3); }); t3 = new Thread(()->{ parkUnPark.print("c",t1); }); // Sleep.sleep(1); t1.start(); t2.start(); t3.start(); //主线程先唤醒t1 log.debug("主线程唤醒t1"); LockSupport.unpark(t1); } } @Data @AllArgsConstructor class ParkUnPark{ private int loopNumber; public void print(String str,Thread next){ for (int i = 0; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); //唤醒下一个线程 LockSupport.unpark(next); } } }
运行结果:
19:15:36.449 [main] DEBUG com.example.DesignModule.Demo36 - 主线程唤醒t1
abcabcabcabcabc
使用volatile改进
在一个线程t1中如何优雅的终止t2?
package com.example.DesignModule; import com.example.tools.Sleep; import lombok.extern.slf4j.Slf4j; /** * @author 我见青山多妩媚 * @date Create on 2022/7/6 19:30 */ @Slf4j public class Demo37 { public static void main(String[] args) { TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination(); //开始线程 twoPhaseTermination.start(); Sleep.sleep(3.5); log.debug("停止监控"); //停止线程 twoPhaseTermination.stop(); } } @Slf4j class TwoPhaseTermination{ //监控线程 private Thread monitorThread; //停止线程 private volatile boolean stop = false; public void start(){ monitorThread = new Thread(()->{ while (true){ //是否被打断 if(stop){ log.debug("料理后事"); break; } try { Sleep.sleep(1); log.debug("执行监控记录"); } catch (Exception e) { e.printStackTrace(); } } },"monitor"); monitorThread.start(); } public void stop(){ stop = true; //目的是如果线程正在睡眠,可以在睡眠时就打断停止,不用等到下次while循环之后再打断 monitorThread.interrupt(); } }
运行结果:
19:42:59.777 [monitor] DEBUG com.example.DesignModule.TwoPhaseTermination - 执行监控记录
19:43:00.792 [monitor] DEBUG com.example.DesignModule.TwoPhaseTermination - 执行监控记录
19:43:01.807 [monitor] DEBUG com.example.DesignModule.TwoPhaseTermination - 执行监控记录
19:43:02.279 [main] DEBUG com.example.DesignModule.Demo37 - 停止监控
19:43:02.280 [monitor] DEBUG com.example.DesignModule.TwoPhaseTermination - 执行监控记录
19:43:02.280 [monitor] DEBUG com.example.DesignModule.TwoPhaseTermination - 料理后事
java.lang.InterruptedException: sleep interrupted //睡眠时被打断
at java.lang.Thread.sleep(Native Method)
at com.example.tools.Sleep.sleep(Sleep.java:10)
at com.example.DesignModule.TwoPhaseTermination.lambda$start$0(Demo37.java:43)
at java.lang.Thread.run(Thread.java:748)
//如果不使用 monitorThread.interrupt(); 不会看到异常信息,时间间隔也不会是3.5s,而是4s
BalKing(犹豫)模式在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
例
package com.example.DesignModule; import lombok.extern.slf4j.Slf4j; /** * @author 我见青山多妩媚 * @date Create on 2022/7/7 15:16 */ @Slf4j public class Demo38 { private volatile boolean starting; public void start(){ log.info("尝试启动线程"); synchronized (this){ if(starting){ return; } starting = true; } //真正启动线程 } }
他也经常用来实现线程安全的单例
public final class Singleton{
private Singleton(){
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance(){
if(INSTANCE != null){
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待
享元模式(Flyweight pattern)当需要重用数量有限的同一类对象时
在JDK中,Boolean、Byte、Short、Integer、Long、Character等包装类提供了一个valueOf方法,例如Long的valueOf
会缓存-128~127之间的Long对象,在这个范围之间会重用对象,大于这个范围,才会创建新Long对象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
Djava.lang.Integer.IntegerCache.hign
来改变比如一个线上商城应用,QPS达到数千,如果没吃都重新创建和关闭数据库连接,那么性能会受到很大的影响,这是预先创建好一批连接,放入连接池,一次请求达到后从连接池中获取连接,使用完毕之后再还会连接池,这样既节约了连接的创建和关闭,也实现了连接的复用,不至于让庞大的连接数压垮数据库
package com.example.Seven; import com.example.tools.Sleep; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerArray; /** * @author 我见青山多妩媚 * @date Create on 2022/7/14 15:39 */ @Slf4j public class Demo03 { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(()->{ //使用 Connection connection = pool.borrow(); Sleep.sleep(new Random().nextInt(1000)); //释放 pool.free(connection); }).start(); } } } @Data @Slf4j class Pool{ //1、连接池大小 固定大小的连接池 private final int poolSize; //2、连接对象的数组 private Connection[] connections; //3、连接状态数组 0 空闲 1繁忙 防止使用int出现线程不安全问题 private AtomicIntegerArray states; //初始化 public Pool(int poolSize) { this.poolSize = poolSize; this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接"+(i+1)); } } //5、借连接 public Connection borrow(){ while (true){ for (int i = 0; i < poolSize; i++) { if (states.get(i) == 0) { //防止线程安全问题,不使用set方法 0--->1 if (states.compareAndSet(i,0,1)) { log.debug("borrow {}",connections[i]); return connections[i]; } } } //如果没有空闲连接 当前线程进入等待 synchronized (this){ try { log.debug("wait..."); this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } //6、还连接 public void free(Connection conn){ for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i,0); synchronized (this){ log.debug("free {}",conn); this.notifyAll(); } break; } } } } //获取数据库连接 class MockConnection implements Connection{ private String name; public MockConnection(String name) { this.name = name; } @Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } //...继承Connection的子类 }
运行结果:
16:08:35.404 [Thread-2] DEBUG com.example.Seven.Pool - wait...
16:08:35.404 [Thread-1] DEBUG com.example.Seven.Pool - borrow MockConnection{name='连接2'}
16:08:35.407 [Thread-4] DEBUG com.example.Seven.Pool - wait...
16:08:35.407 [Thread-3] DEBUG com.example.Seven.Pool - wait...
16:08:35.404 [Thread-0] DEBUG com.example.Seven.Pool - borrow MockConnection{name='连接1'}
不过以上的代码有些问题没有考虑:
对于关系型数据库,有比较成熟的连接池可以使用,例如c3p0,druid等
对于更通用的对象池,可以考虑使用Apache commons pool ,例如redis连接池参考jedis中关于连接池的实现
让有限的工作线程(work thread)来轮流异步处理无限多的线程,也可以将其归类为分工模式,他的典型是实现其他线程,也体现了几点设计模式中的享元模式。
例如海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本会很高(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工
固定大小的线程池有饥饿现象
package com.example.Eight; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author 我见青山多妩媚 * @date Create on 2022/7/18 15:43 */ @Slf4j public class Demo06 { static final List<String> ENUM = Arrays.asList("地三鲜","宫保鸡丁","辣子鸡丁","烤鸡腿"); static Random RANDOM = new Random(); static String cooking(){return ENUM.get(RANDOM.nextInt(ENUM.size()));} public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(()->{ //一个线程处理点餐 log.debug("处理点餐"); //另一个线程处理做菜 Future<String> f = pool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜:{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); pool.execute(()->{ log.debug("处理点餐"); Future<String> f = pool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜:{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
运行结果:
09:04:58.352 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 处理点餐
09:04:58.352 [pool-1-thread-2] DEBUG com.example.Eight.Demo06 - 处理点餐
可以看到,两个线程都去处理点餐了,没有多的线程去处理做菜的操作,当线程池内再添加一个线程变为三个线程时,运行结果为:
//ExecutorService pool = Executors.newFixedThreadPool(3);
09:06:14.509 [pool-1-thread-2] DEBUG com.example.Eight.Demo06 - 处理点餐
09:06:14.509 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 处理点餐
09:06:14.526 [pool-1-thread-3] DEBUG com.example.Eight.Demo06 - 做菜
09:06:14.526 [pool-1-thread-3] DEBUG com.example.Eight.Demo06 - 做菜
09:06:14.526 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 上菜:烤鸡腿
09:06:14.526 [pool-1-thread-2] DEBUG com.example.Eight.Demo06 - 上菜:辣子鸡丁
这样做确实解决了饥饿的问题,但是总不能多来一个客人就多招一名服务员吧?所以最好的方法就是将不同的任务放在不同的线程池内,服务员放在服务员线程,厨师放在厨师线程,互不干扰,这个例子是吧两个角色全都放在一个线程了,execute()需要一个线程去执行,submit()也需要一个线程去执行,所以当数量为2的时候,自然就不够了
package com.example.Eight; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author 我见青山多妩媚 * @date Create on 2022/7/18 15:43 */ @Slf4j public class Demo06 { static final List<String> ENUM = Arrays.asList("地三鲜","宫保鸡丁","辣子鸡丁","烤鸡腿"); static Random RANDOM = new Random(); static String cooking(){return ENUM.get(RANDOM.nextInt(ENUM.size()));} public static void main(String[] args) { //服务员线程池,专门负责点餐 ExecutorService waiterPool = Executors.newFixedThreadPool(1); //厨师线程池,专门负责做菜 ExecutorService cookPool = Executors.newFixedThreadPool(1); //一个线程处理点餐 //同一个线程池内逻辑互不干扰 waiterPool.execute(()->{ log.debug("处理点餐"); //另一个线程处理做菜 Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜:{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(()->{ log.debug("处理点餐"); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜:{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
使用两个不同的线程池,一个专门负责点餐waiterPool
,一个专门负责做菜cookPool
运行结果:
09:10:44.395 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 处理点餐
09:10:44.402 [pool-2-thread-1] DEBUG com.example.Eight.Demo06 - 做菜
09:10:44.402 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 上菜:宫保鸡丁
09:10:44.404 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 处理点餐
09:10:44.405 [pool-2-thread-1] DEBUG com.example.Eight.Demo06 - 做菜
09:10:44.405 [pool-1-thread-1] DEBUG com.example.Eight.Demo06 - 上菜:辣子鸡丁
通常采用cpu核数+1能够实现最优的CPU利用率,+1是保证当线程中由于缺页故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这是会使用CPU资源,但当你执行IO操作时,远程RPC调用时,包括进行数据库操作时,这时候CPU闲下来了,你可以利用多线程提高它的利用率。
经验公式:
线程数 = 核数 * 期望CPU利用率 * 总共时间(CPU计算时间 + 等待时间)/ CPU计算时间
例如4核CPU计算时间是50%,其等待时间是50%,期望CPU被100%利用:
4 * 100% * (50% + 50%)/50% = 8
例如4核CPU计算时间是10%,其等待时间是90%,期望CPU被100%利用:
4 * 100% * (10% + 90%)/10% = 40
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。