当前位置:   article > 正文

【三】多线程 —— 设计模式_thread join 自动关闭

thread join 自动关闭

一、多线程设计模式之一:保护性暂停(Guarded Suspension)

1.1 基本概念

定义:即Guarded Suspension,用在一个线程等待另一个线程的执行结果。

定义说明了它要解决的问题,就是用在一个线程等待另一个线程的执行结果时使用。

保护性暂停的暂停就是当条件不满足的时候就去进行wait等待。

1.2 要点

  • 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么此时就不能使用这个保护性暂停模式了,我们可以使用消息队列(见生产者/消费者)。
  • JDK中,join的实现、Future的实现,采用的就是此模式。(用join一个线程等待另一个线程结束就可以拿到结果了,其实这也是保护性线程的一个应用)
  • 因为要等待另一方的结果,因此归类到同步模式。

比如线程2产生这个结果,然后线程1想要得到这个结果,那就可以让GuardedObject充当一个桥梁,让线程1、2都关联到这个对象上。
在这里插入图片描述

1.3 代码实现

场景模拟:线程1等待线程2的下载结果。

public class DownLoader {
    public static List<String> download () throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        BufferedReader bufferedReader =
                new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
        List<String> lines =new ArrayList<>();
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            lines.add(line);
        }
        return lines;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
/**
 * 该类中就是我们上一节使用的wait/notifyAll正确使用方法
 */
public class GuardedObject {
    // 结果(因为我们这个结果可以是任意对象,所以这里设置为Object)
    private Object response;

    // 获取结果的方法
    public Object get() throws InterruptedException {
        // 这里为什么加while而不是直接wait呢?
        while (null == response) {
            // 这里为什么加锁呢?
            // 1、保护性暂停模式是两个线程对同一个线程进行操作的,
            // 所以多个线程之间要对同一个对象操作,那么要加锁
            // 2、wait、notify、notifyAll三个方法都必须是线程为资源锁的owner
            // 才能调用否则会报错误IllegalMonitorStateException
            synchronized (this) {
                this.wait();
            }
        }
        return response;
    }

    // 产生结果的方法
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
public class GuardedTest {
    // t1 等待 t2线程的下载结果
    public static void main(String[] args) {
        // 这里的使用同一个guardeObject对象
        // 原因1:保护性暂停模式,两个线程的使用同一个guardeObject对象
        // 原因2:代码设计到synchronized,所以多个线程要对同一个对象进行操作
        // 以确保共享成员变量的安全性(这里的共享成员变量是GuardeObject中的response对象)
        GuardedObject guardeObject = new GuardedObject();

        new Thread(new Runnable() {
            @Override
            public void run() {
                Console.log(DateUtil.now() +Thread.currentThread().getName() + ":等待结果");
                try {
                    // 等待结果:至于等待多久得看线程2啥时候下载完
                    List<String> list = (List<String>)guardeObject.get();
                    Console.log(DateUtil.now() +Thread.currentThread().getName() + ":结果大小" + list.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();


        new Thread(()->{
            Console.log(DateUtil.now() +Thread.currentThread().getName() + ":执行下载");
            try {
                List<String> list = DownLoader.download();
                // 将结果传递给GuardeObject,并唤醒正在等待的线程
                guardeObject.complete(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
2021-08-02 12:27:08t1:等待结果
2021-08-02 12:27:08Thread-0:执行下载
2021-08-02 12:27:09t1:结果大小3
  • 1
  • 2
  • 3

**该模式的好处1:**之前我们是使用join去等待一个线程结束然后另一个线程获取该线程的返回结果,但是现在我们的t2下载完了,不用结束,它还可以干其他的事情。
**该模式的好处2:**用join的那种方法,返回的变量只能设置成为全局的。现在返回的可以是局部的,两个线程中代码里返回的变量list。

代码增强 - 1
增加一个超时的效果,比如我们上面的代码线程1在等待线程2下载完毕,但是比如我等待了2s后就不想等待了。显然上面不带参数的guardeObject.get()等待方法是不行的,只要不下载完他就会一直等待。

修改guardeObject.get()方法添加超时检测,当线程1等待超时时就不再等待。

public class GuardedObject01 {
    // 结果(因为我们这个结果可以是任意对象,所以这里设置为Object)
    private Object response;

    // 获取结果的方法
    public Object get(long timeout) throws InterruptedException {
        synchronized (this) {
            // 开始时间
            long beginTime = System.currentTimeMillis();
            // 已经等待的时间
            long passedTime = 0;
            while (null == response) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 等待的时间超过了最大等待时间,退出循环
                if (waitTime <= 0) {
                    break;
                }
                this.wait(waitTime);  // 虚假唤醒

                // 已经等待的时间
                passedTime = System.currentTimeMillis() - beginTime;
            }
            return response;
        }
    }

    // 产生结果的方法
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
public class GuardedTest01 {
    // t1 等待 t2线程的下载结果
    public static void main(String[] args) {
        GuardedObject01 guardeObject = new GuardedObject01();
        // 线程t1
        new Thread(new Runnable() {
            @Override
            public void run() {
                Console.log(DateUtil.now() +Thread.currentThread().getName() + ":等待结果");
                try {
                    // 等待结果:至于等待多久得看线程2啥时候下载完
                    Object response = guardeObject.get(2000);
                    Console.log(DateUtil.now() +Thread.currentThread().getName() + ":结果是" + response);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        // 线程t2
        new Thread(()->{
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Console.log(DateUtil.now() +Thread.currentThread().getName() + ":执行下载");
            try {
                List<String> list = DownLoader.download();
                // 将结果传递给GuardeObject,并唤醒正在等待的线程
                guardeObject.complete(list);
                Console.log(DateUtil.now() +Thread.currentThread().getName() + ":下载完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
2021-08-02 12:45:52t1:等待结果
2021-08-02 12:45:53t2:执行下载
2021-08-02 12:45:54t1:结果是null
2021-08-02 12:45:55t2:下载完毕
  • 1
  • 2
  • 3
  • 4

可以看到当t1等待的时间超过设置的时间之后,t1就不会再等待了,它会退出去执行其他代码。
当等待时间没有超过设置时间的时候,t1会打印t2下载的结果。

下面我们来测试一下虚假唤醒的情况,就是我们再线程2里直接返回guardeObject对象一个null,然后线程1调用guardeObject.get,但是此时返回为null,不满足退出wait的条件,于是它因该再等一会超过等待时间再推出,两端的等待时间加一块等于我们在线程1(guardeObject.get)里传入的时间才算正确。
测试方法:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
代码增强 - 2

若需要在多个类之间使用GuardedObject, 可以使用一个类来解耦【结果等待者】和【结果生产者】。
在这里插入图片描述
图中Futures就好比是有编号的信箱,左边t0、t2、t4就是等待者,t1、t3、t5就是邮递员。

class GuardedObject02 {

    //标识Guarder Object
    private int id;

    public GuardedObject02(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    //结果
    private Object response;

    //获取结果
    public Object get(long timeout) {
        synchronized (this) {
            //开始时间
            long begin = System.currentTimeMillis();
            //经历的时间
            long passedTime = 0;

            //没有结果
            while (response == null) {
                long waitTime = timeout - passedTime;
                if (waitTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //求得经历的时间
                passedTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response) {
        synchronized (this) {
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
class Mailboxes {
    private static Map<Integer, GuardedObject02> boxes = new Hashtable<>();

    private static int id = 1;
    //产生唯一id
    private static synchronized int generateId() {
        return id++;
    }

    public static GuardedObject02 createGuardedObject() {
        GuardedObject02 go = new GuardedObject02(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    public static Set<Integer> getIds() {
        return boxes.keySet();
    }

    public static GuardedObject02 getGuardObject(int id) {
        return boxes.remove(id);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
class Postman extends Thread {
    private int id;
    private String mail;

    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
        GuardedObject02 guardObject = Mailboxes.getGuardObject(id);
        guardObject.complete(mail);
        System.out.println("开始送信id: " + id + " " + mail);
    }

    @Override
    public long getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getMail() {
        return mail;
    }

    public void setMail(String mail) {
        this.mail = mail;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
class People extends Thread {
    @Override
    public void run() {
        //收信
        GuardedObject02 guardedObject = Mailboxes.createGuardedObject();
        System.out.println("开始收信id: " +  guardedObject.getId());
        Object o = guardedObject.get(5*1000);
        System.out.println("收到信id: " + guardedObject.getId() + " " + o );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
/**
 * 同步模式之保护性暂停
 * 扩展: 解耦
 */
public class ProtectivePauseAddDecoupling {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        Thread.sleep(1);

        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "居民id为: " + id +"的内容").start();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
开始收信id: 1
开始收信id: 2
开始收信id: 3
收到信id: 3 null
收到信id: 2 null
收到信id: 1 null
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二、join原理

join方法的使用:比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。

其实join的底层原理就是使用了上面的保护性暂停模式,来实现了我要等待另一个线程结束。当然它和保护性暂停模式还有区别。

保护性暂停是一个线程等待另一个线程的结果(返回结果后该线程仍然可以运行其他代码)。而join是一个线程等待另一个线程的结束。

但是他们的方式都是差不多的。来看一下join的源码,它是Thread的方法,无参join实际上是调用了长整型参数的join:
在这里插入图片描述
在这里插入图片描述

三、异步模式之生产者/消费者

3.1 要点

  • 和前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式
    在这里插入图片描述
    跟上面的模式对比,上面的模式每个居民就配置了一个邮递员,这是有些违背现实的。而生产者消费者模式则使用队列等形式,只需要一个邮递员就可以服务多个居民。

为啥说这种模式是异步的呢?
主要是因为某个线程产生的消息,不一定是被立刻消费的,要等前面的消息被消费后才行。

/**
 * 消息队列,Java线程之间通信
 */
public class MessageQueue {

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);

        //测试
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id, "值" + id));
            }, "生产者" + i).start();
        }

        new Thread(() -> {
            queue.take();
        }, "消费者").start();
    }

    private LinkedList<Message> list = new LinkedList<>();
    private int capcity;

    public MessageQueue(int capcity) {
        this.capcity = capcity;
    }

    //获取消息
    public Message take() {
        //检查对象是否为空
        synchronized (list) {
            while (list.isEmpty()) {
                try {
                    System.out.println("对列为空,消费者线程只能等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //从队列头部获取消息并返回
            Message message = list.removeFirst();
            System.out.println("已消费消息: " +message);
            list.notifyAll();

            return message;
        }
    }

    //存入消息
    public void put(Message message) {
        synchronized (list) {
            //检查队列是否已满
            while (list.size() == capcity) {
                try {
                    System.out.println("队列已满,生产者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.addLast(message);
            System.out.println("已生产消息: " + message);
            list.notifyAll();
        }
    }

}

final class Message {
    private int id;

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }

    private Object value;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public Object getValue() {
        return value;
    }

    public void setValue(Object value) {
        this.value = value;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
已生产消息: Message{id=2, value=2}
已生产消息: Message{id=1, value=1}
队列已满,生产者线程等待
已消费消息: Message{id=2, value=2}
已生产消息: Message{id=0, value=0}
  • 1
  • 2
  • 3
  • 4
  • 5

3.2 park & unpark

线程通信的基础
它们是LockSupport类中的方法:

 		// 暂停当前线程,park之后线程对应的是wait状态
        LockSupport.park();
        // 恢复某个线程的运行
        LockSupport.unpark(暂停线程对象);
  • 1
  • 2
  • 3
  • 4
public class TestParkUnpark01 {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            ConsoleUtil.print("start...");
            try {
                TimeUnit.MILLISECONDS.sleep(1000);//t1睡眠了一秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ConsoleUtil.print("park...");
            LockSupport.park();//t1线程一秒后暂停
            ConsoleUtil.print("resume...");
        }, "t1");
        t1.start();

        TimeUnit.MILLISECONDS.sleep(2000);//主线程睡眠二秒
        ConsoleUtil.print("unpark...");
        LockSupport.unpark(t1);//二秒后由主线程恢复t1线程的运行
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
2021-08-02 17:44:14 start...
2021-08-02 17:44:15 park...
2021-08-02 17:44:16 unpark...
2021-08-02 17:44:16 resume...
  • 1
  • 2
  • 3
  • 4

与wait¬ify不同点:unpark可以在park执行之前或者执行之后调用,也就是说,可以在先调用unpark然后再调用park,此时线程任务不会暂停,线程就可以继续往下执行了。

public class TestParkUnpark02 {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            ConsoleUtil.print("start...");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);//t1睡眠了一秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ConsoleUtil.print("park...");
            LockSupport.park();//t1线程一秒后暂停
            ConsoleUtil.print("resume...");
        }, "t1");
        t1.start();

        TimeUnit.MILLISECONDS.sleep(1000);//主线程睡眠二秒
        ConsoleUtil.print("unpark...");
        LockSupport.unpark(t1);//二秒后由主线程恢复t1线程的运行
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
2021-08-02 17:45:18 start...
2021-08-02 17:45:19 unpark...
2021-08-02 17:45:20 park...
2021-08-02 17:45:20 resume...
  • 1
  • 2
  • 3
  • 4

以上代码先unpark,然后在park,但是并未暂停,程序仍然执行下去了。这是为什么呢?请看下面的原理部分。

特点
与Object的wait & notify相比

  • wait, notify和notifyAll必须配合Object Monitor一起使用,而park,unpark不必
  • park & unpark是以线程为单位来[阻塞]和[唤醒]线程(unpark的参数指定了唤醒的线程),而notify只能随机唤醒一个等待线程,
  • notifyAll是唤醒所有等待线程,就不那么[精确]
  • park & unpark可以先unpark,而wait & notify不能先notify

原理
每个线程都有自己的-一个Parker对象(java层面是看不到得),由三部分组成_ counter, cond 和 _mutex 打个比喻

  • 线程就像一一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。counter 就好比背包中的备用干粮(0
    为耗尽,1为充足)
  • 调用park就是要看需不需要停下来歇息
    1、如果备用干粮耗尽,那么钻进帐篷歇息
    2、如果备用干粮充足,那么不需停留,继续前进
  • 调用unpark,就好比令干粮充足
    1、如果这时线程还在帐篷,就唤醒让他继续前进
    2、如果这时线程还在运行,那么下次他调用park时,仅是消耗掉备用干粮,不需停留继续前进
    2.1、因为背包空间有限,多次调用unpark仅会补充一份备用干粮
    在这里插入图片描述
    在这里插入图片描述
    先调用unpark再调用park的情况:
    在这里插入图片描述

3.3 多把锁

一间大屋子有两个功能:睡觉、学习,互不相干。

现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低

解决方法是准备多个房间(多个对象锁),即锁的粒度细分

先看一把锁的效果:

public class BigRoom {
    public void sleep() throws InterruptedException {
        synchronized (this) {
            ConsoleUtil.print("sleeping 2 小时");
            Thread.sleep(2000);
        }
    }

    public void study() throws InterruptedException {
        synchronized (this) {
            ConsoleUtil.print("study 1 小时");
            Thread.sleep(6000);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
public class MulLocks {
    public static void main(String[] args) {
        BigRoom bigRoom = new BigRoom();

        new Thread(() -> {
            try {
                bigRoom.study();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "小南").start();

        new Thread(() -> {
            try {
                bigRoom.sleep();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "小女").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
2021-08-02 18:17:27 study 1 小时
2021-08-02 18:17:33 sleeping 2 小时
  • 1
  • 2

上面并发度不高,等于是串行的。

改进:

public class BigRoom01 {
    private final Object studyObj = new Object();
    private final Object sleepObj = new Object();


    public void sleep() throws InterruptedException {
        synchronized (sleepObj) {
            ConsoleUtil.print("sleeping 2 小时");
            Thread.sleep(2000);
        }
    }

    public void study() throws InterruptedException {
        synchronized (studyObj) {
            ConsoleUtil.print("study 1 小时");
            Thread.sleep(6000);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
2021-08-02 18:22:00 study 1 小时
2021-08-02 18:22:00 sleeping 2 小时
  • 1
  • 2

同一时间,既学习又睡觉互不干扰,也就是细粒度的锁可以增强并发性。

将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

3.4 活跃性

3.4.1 死锁

一个线程需要同时获取多把锁,这时就容易发生死锁。例如t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁。

定位死锁
检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁。

3.4.2 活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如:

public class TestLiveLock {
    static volatile int count = 10;
    static final Object lock = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            // 期望减到 0 退出循环
            while (count > 0) {
                sleep(0.2);
                count--;
                log.debug("count: {}", count);
            }
        }, "t1").start();
        
        new Thread(() -> {
            // 期望超过 20 退出循环
            while (count < 20) {
                sleep(0.2);
                count++;
                log.debug("count: {}", count);
            }
        }, "t2").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

上面的线程单独运行都能正常执行完步然后退出,但是结合在一起就会出现活锁,即互相改变条件,导致两个线程都无法正常执行完毕。

解决活锁就是让各线程在执行时间上有一定的交错。

3.4.2 饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束。

四、ReentrantLock

相对于synchronized它具备如下特点:

  • 可中断
  • 可设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与synchronized一样,都支持可重入

4.1 可重入性

可重入是指用一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

基本语法:
在这里插入图片描述
这里的lock对象本身就是一把锁,而之前的加锁是指对象及其关联的Monitor对象而成的锁。finally保证即使出现异常也能释放锁。

public class ReentrantLock01 {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        lock.lock();

        try {
            method1();
            lock.unlock();
        } finally {

        }
    }

    public static void method1() {
        lock.lock();
        try {
            ConsoleUtil.print("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }
    public static void method2() {
        lock.lock();
        try {
            ConsoleUtil.print("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }
    public static void method3() {
        lock.lock();
        try {
            ConsoleUtil.print("execute method3");
        } finally {
            lock.unlock();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
2021-08-02 18:39:31 execute method1
2021-08-02 18:39:31 execute method2
2021-08-02 18:39:31 execute method3
  • 1
  • 2
  • 3

这里在执行m1和m2就是锁的重入了,若不可重入那么就会被阻塞。

4.2 可中断性

public class ReentrantLock02 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                System.out.println("t1  进入lock。。。。。");
                lock.lockInterruptibly();     //可打断lock
                System.out.println("睡眠结束了。。。。");
            } catch (InterruptedException e) {
                System.out.println("t1线程被打断了");
                e.printStackTrace();
            }finally {
                lock.unlock();   //解锁
            }
        }, "t1");

        lock.lock();
        t1.start();
        Thread.sleep(1000);
        System.out.println("打断t1 进程");
        t1.interrupt();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
t1  进入lock。。。。。
打断t1 进程
t1线程被打断了
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at thread.demo.ReentrantLock02.lambda$main$0(ReentrantLock02.java:11)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "t1" java.lang.IllegalMonitorStateException
	at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
	at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
	at thread.demo.ReentrantLock02.lambda$main$0(ReentrantLock02.java:17)
	at java.lang.Thread.run(Thread.java:748)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

上面使用的是lock.interruptible,开启打断效果。若直接使用lock.lock那就不会有打断效果,t1会继续阻塞,且在编译期就会出错。
在这里插入图片描述

五、锁超时

前面介绍过可打断的特性可以避免线程进行死等,但这种避免方式是被动的,需要其他线程来打断,自己无法控制。

tryLock给了我们主动的方式,即尝试获得锁,获取成功就执行临界区代码,获取失败就立即执行相关流程,而不是进入阻塞继续获取锁或者等待别的线程打断。

public class ReentrantLock03 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            System.out.println("尝试获取 lock。。。。。");
            try {
                if(!lock.tryLock(1, TimeUnit.SECONDS)){
                    System.out.println("获取失败");
                    return;
                }
                System.out.println("获取成功");
            } catch (InterruptedException e) {
                System.out.println("获取锁异常");
                e.printStackTrace();
                return;
            }finally {
                lock.unlock();   //解锁
            }
        }, "t1");

        lock.lock();
        System.out.println("000000000");
        t1.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
000000000
尝试获取 lock。。。。。
获取失败
Exception in thread "t1" java.lang.IllegalMonitorStateException
	at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
	at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
	at thread.demo.ReentrantLock03.lambda$main$0(ReentrantLock03.java:22)
	at java.lang.Thread.run(Thread.java:748)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

六、公平锁

ReentrantLock 默认是不公平的,new对象的时候传入一个true即可设置为公平锁。
在这里插入图片描述
通过这个构造方法的参数去设置公平和非公平性。

public class Test5 {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock(false);
        lock.lock();
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "t" + i).start();
        }
// 1s 之后去争抢锁
        Thread.sleep(1000);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " start...");
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " running...");
            } finally {
                lock.unlock();
            }
        }, "强行插入").start();
        lock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在这里插入图片描述
改成公平的会出现在最后:
在这里插入图片描述
公平锁一般没必要,会降低并发。

七、条件变量

synchronized 中也有条件变量,就是那个 waitSet 休息室,当条件不满足时进入 waitSet 等待。

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

使用:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)重新竞争 lock 锁,执行唤醒的线程也必须先获得锁
  • 竞争 lock 锁成功后,从 await后继续执行
  • signal用于唤醒某个线程就好比notifysignalAll类比notifyAll
public class ConditionTest {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;
    static ReentrantLock ROOM = new ReentrantLock();
    // 等待烟的休息室
    static Condition waitCigaretteSet = ROOM.newCondition();
    // 等外卖的休息室
    static Condition waitTakeoutSet = ROOM.newCondition();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            ROOM.lock();
            try {
                Console.log(DateUtil.now() + "有烟没?[{}]", hasCigarette);
                while (!hasCigarette) {
                    Console.log(DateUtil.now() + "没烟,先歇会!");
                    try {
                        waitCigaretteSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Console.log(DateUtil.now() + "可以开始干活了");
            } finally {
                ROOM.unlock();
            }
        }, "小南").start();

        new Thread(() -> {
            ROOM.lock();
            try {
                Console.log(DateUtil.now() + "外卖送到没?[{}]", hasTakeout);
                while (!hasTakeout) {
                    Console.log(DateUtil.now() + "没外卖,先歇会!");
                    try {
                        waitTakeoutSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Console.log(DateUtil.now() + "可以开始干活了");
            } finally {
                ROOM.unlock();
            }
        }, "小女").start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            ROOM.lock();
            try {
                hasTakeout = true;
                waitTakeoutSet.signal();
            } finally {
                ROOM.unlock();
            }
        }, "送外卖的").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(() -> {
            ROOM.lock();
            try {
                hasCigarette = true;
                waitCigaretteSet.signal();
            } finally {
                ROOM.unlock();
            }
        }, "送烟的").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
2021-08-02 21:16:12有烟没?[false]
2021-08-02 21:16:12没烟,先歇会!
2021-08-02 21:16:12外卖送到没?[false]
2021-08-02 21:16:12没外卖,先歇会!
2021-08-02 21:16:13可以开始干活了
2021-08-02 21:16:14可以开始干活了
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

八、同步模式之顺序控制

比如,必须先 2 后 1 打印。

8.1 wait notify 版

public class WaitNotify01 {
    // 用来同步的对象
    static final Object lock = new Object();
    // 表示 t2 是否运行过
    static boolean t2runned = false;

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                while (!t2runned) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                ConsoleUtil.print("1");
            }
        }, "t1");


        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                ConsoleUtil.print("2");
                t2runned = true;
                lock.notify();
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
2021-08-02 21:27:16 2
2021-08-02 21:27:16 1
  • 1
  • 2

8.2 Park Unpark 版

public class ParkTest {
    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            LockSupport.park();
            ConsoleUtil.print("1");
        }, "t1");
        t1.start();

        new Thread(() -> {
            ConsoleUtil.print("2");
            LockSupport.unpark(t1);
        },"t2").start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
2021-08-02 21:29:43 2
2021-08-02 21:29:43 1
  • 1
  • 2

8.3 交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现?

public class ParkTest02 {
    static Thread t1;
    static Thread t2;
    static Thread t3;
    public static void main(String[] args) {
        ParkUnpark pu = new ParkUnpark(5);
        t1 = new Thread(() -> {
            pu.print("a", t2);
        });
        t2 = new Thread(() -> {
            pu.print("b", t3);
        });
        t3 = new Thread(() -> {
            pu.print("c", t1);
        });
        t1.start();
        t2.start();
        t3.start();
        LockSupport.unpark(t1);
    }
}

class ParkUnpark {
    public void print(String str, Thread next) {
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(next);
        }
    }

    private int loopNumber;

    public ParkUnpark(int loopNumber) {
        this.loopNumber = loopNumber;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

九、 synchronized的可重入和不可中断特性

可重入

一个线程可以多次执行synchronized,重复获取同一把锁。synchronized是可重入锁,内部锁对象会有一个计数器记录线程获取几次锁啦,在执行完同步代码块时,计数器的数量会减1,直到计数器的数量为0,就释放这个锁。

不可中断

一个线程获得锁后,另一个线程想要获得锁,必须处于阻塞或等待状态,如果第一个线程不释放锁,第二个线程会一直处于阻塞或等待,不可被中断。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/177897
推荐阅读
相关标签
  

闽ICP备14008679号