当前位置:   article > 正文

多线程--JUC并发编程_set dataset = new concurrentskiplistset<>(
set dataset = new concurrentskiplistset<>()

全部测试代码

什么是JUC

JUC就是java.util.concurrent工具包的简称

 

线程和进程

进程

一个在内存中运行的应用程序。每个进程都有自己独立的一块内存空间,一个进程可以有多个线程,比如在Windows系统中,一个运行的xx.exe就是一个进程。

线程

进程中的一个执行任务(控制单元),负责当前进程中程序的执行。一个进程至少有一个线程,一个进程可以运行多个线程,多个线程可共享数据。

java默认有2个线程 mian线程 gc线程

并行和并发有什么区别?

  • 并发:多个任务在同一个 CPU 核上,按细分的时间片轮流(交替)执行,从逻辑上来看那些任务是同时执行。

  • 并行:单位时间内,多个处理器或多核处理器同时处理多个任务,是真正意义上的“同时进行”。

  • 串行:有n个任务,由一个线程按顺序执行。由于任务、方法都在一个线程执行所以不存在线程不安全情况,也就不存在临界区的问题。

做一个形象的比喻:

并发 = 两个队列和一台咖啡机。

并行 = 两个队列和两台咖啡机。

串行 = 一个队列和一台咖啡机。

并发变成的本质: 充分利用CPU资源

线程的状态

NEW  //新生
RUNNABLE,// 运行
BLOCKED,//阻塞
WAITING,// 等待,死等 
TIMED_WAITING,// 超时等待
TERMINATED;// 终止

wait/sleep的区别 ?

  1. 来自不同的类

    1. wait --> Object

    2. sleep --> Thread

  2. 锁的释放

    1. wait 会释放锁

    2. sleep 抱着锁睡觉,不会释放锁

  3. 使用的范围不同

    1. wait 必须在同步代码块

    2. sleep 可以在任何地方睡

  4. 是否需要捕获异常

    1. wait 需要捕获异常wait需要被唤醒,

    2. sleep 须捕获异常 ,sleep不需要被唤醒

Lock锁

传统synchronized

 

 

公平锁: 十分公平 , 可以先来后到

非公平锁: 不公平 , 可以插队 (默认使用)

  1. package com.juc;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. //买票
  5. public class SaleTickDemo2 {
  6. public static void main(String[] args) {
  7. // 多线程操作
  8. Ticket2 ticket = new Ticket2();
  9. // Runnable ; @FunctionalInterface 函数式接口 lambda (参数) - > {代码}
  10. new Thread( ()-> {
  11. for (int i = 0; i < 60 ; i++) {
  12. ticket.sale();
  13. }
  14. },"A").start();
  15. new Thread( ()-> {
  16. for (int i = 0; i < 60 ; i++) {
  17. ticket.sale();
  18. }
  19. },"B").start();
  20. new Thread( ()-> {
  21. for (int i = 0; i < 60 ; i++) {
  22. ticket.sale();
  23. }
  24. },"C").start();
  25. }
  26. }
  27. /*
  28. * 1. new ReentrantLock()
  29. * 2. lock.lock(); 加锁
  30. * 3.finally lock.unlock();解锁
  31. * */
  32. class Ticket2 {
  33. private int num = 50;
  34. Lock lock = new ReentrantLock();
  35. // 买票的方法
  36. public void sale(){
  37. lock.lock(); //加锁
  38. try {
  39. // 业务代码
  40. if (num > 0) {
  41. System.out.println(Thread.currentThread().getName() + "第几张" + (num--) + "剩余" + num + "张");
  42. }
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }finally {
  46. // 解锁
  47. lock.unlock();
  48. }
  49. }
  50. }

Synchronized 与 lock 区别

  1. Synchronized 内置java关键字, lock是java的一个类

  2. Synchronized 无法判断获取锁的状态, lock可以判断是否获取到了锁

  3. Synchronized 会自动释放锁 , lock必须手动释放锁, 不释放锁会导致死锁

  4. Synchronized 线程1 获得锁 线程2 等待 线程1阻塞线程2 会死等; lock 不一定会等待 lock.tryLock(); 尝试获取锁

  5. Synchronized 可重入锁,不可中断,非公平. lock 可重入锁,可以判断锁 ,锁的公平(可以自定义)

  6. Synchronized 适合少量代码同步问题, lock 适合大量代码同步

生产者消费者问题

  1. package com.pc;
  2. /*
  3. * 线程之间的通信问题,生产者和消费者问题 等待唤醒 通知唤醒
  4. * 线程交替执行 A B 操作同一变量*/
  5. public class A {
  6. public static void main(String[] args) {
  7. Data date = new Data();
  8. new Thread(()->{
  9. for (int i = 0; i < 10; i++) {
  10. try {
  11. date.increment();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. },"A").start();
  17. new Thread(()->{
  18. for (int i = 0; i < 10; i++) {
  19. try {
  20. date.decrement();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. },"B").start();
  26. }
  27. }
  28. // 等待 业务 通知
  29. class Data{
  30. private int number = 0;
  31. public synchronized void increment() throws InterruptedException {
  32. // +1
  33. if (number != 0) {
  34. // 等待
  35. this.wait();
  36. }
  37. number++;
  38. System.out.println(Thread.currentThread().getName()+" "+number);
  39. // 通知其他线程 我 +1 完毕了
  40. this.notifyAll();
  41. }
  42. public synchronized void decrement() throws InterruptedException {
  43. // -1
  44. if (number == 0) {
  45. //等待
  46. this.wait();
  47. }
  48. number--;
  49. System.out.println(Thread.currentThread().getName()+" "+number);
  50. // 通知其他线程 我-1 完毕了
  51. this.notifyAll();
  52. }
  53. }

 如果有 ABCD四个线程, 会出现虚假唤醒的情况 此时, 将if判断改为while 判断

  1. package com.pc;
  2. /*
  3. * 线程之间的通信问题,生产者和消费者问题 等待唤醒 通知唤醒
  4. * 线程交替执行 A B 操作同一变量*/
  5. public class A {
  6. public static void main(String[] args) {
  7. Data date = new Data();
  8. new Thread(()->{
  9. for (int i = 0; i < 10; i++) {
  10. try {
  11. date.increment();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. },"A").start();
  17. new Thread(()->{
  18. for (int i = 0; i < 10; i++) {
  19. try {
  20. date.decrement();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. },"B").start();
  26. new Thread(()->{
  27. for (int i = 0; i < 10; i++) {
  28. try {
  29. date.increment();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. },"C").start();
  35. new Thread(()->{
  36. for (int i = 0; i < 10; i++) {
  37. try {
  38. date.decrement();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. },"D").start();
  44. }
  45. }
  46. // 等待 业务 通知
  47. class Data{
  48. private int number = 0;
  49. public synchronized void increment() throws InterruptedException {
  50. // +1
  51. while (number != 0) {
  52. // 等待
  53. this.wait();
  54. }
  55. number++;
  56. System.out.println(Thread.currentThread().getName()+" "+number);
  57. // 通知其他线程 我 +1 完毕了
  58. this.notifyAll();
  59. }
  60. public synchronized void decrement() throws InterruptedException {
  61. // -1
  62. while (number == 0) {
  63. //等待
  64. this.wait();
  65. }
  66. number--;
  67. System.out.println(Thread.currentThread().getName()+" "+number);
  68. // 通知其他线程 我-1 完毕了
  69. this.notifyAll();
  70. }
  71. }

JUC版的生产者消费者

通过Lock 找到 Condition

 

  1. package com.pc;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class B {
  6. public static void main(String[] args) {
  7. Data2 date = new Data2();
  8. new Thread(()->{
  9. for (int i = 0; i < 10; i++) {
  10. try {
  11. date.increment();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. },"A").start();
  17. new Thread(()->{
  18. for (int i = 0; i < 10; i++) {
  19. try {
  20. date.decrement();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. },"B").start();
  26. new Thread(()->{
  27. for (int i = 0; i < 10; i++) {
  28. try {
  29. date.increment();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. },"C").start();
  35. new Thread(()->{
  36. for (int i = 0; i < 10; i++) {
  37. try {
  38. date.decrement();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. },"D").start();
  44. }
  45. }
  46. // 等待 业务 通知
  47. class Data2{
  48. private int number = 0;
  49. Lock lock = new ReentrantLock();
  50. Condition condition = lock.newCondition();
  51. // condition.wait(); // 等待
  52. // condition.signalAll(); // 唤醒
  53. public void increment() throws InterruptedException {
  54. lock.lock();
  55. try {
  56. // 业务代码
  57. // +1
  58. while (number != 0) {
  59. condition.await(); // 等待
  60. }
  61. number++;
  62. System.out.println(Thread.currentThread().getName()+"-->"+number);
  63. // 通知其他线程 我 +1 完毕了
  64. condition.signalAll(); // 唤醒
  65. } catch (Exception e) {
  66. e.printStackTrace();
  67. } finally {
  68. lock.unlock();
  69. }
  70. }
  71. public void decrement() throws InterruptedException {
  72. lock.lock();
  73. try {
  74. // -1
  75. while (number == 0) {
  76. condition.await();// 等待
  77. }
  78. number--;
  79. System.out.println(Thread.currentThread().getName()+"-->"+number);
  80. // 通知其他线程 我-1 完毕了
  81. condition.signalAll(); // 唤醒
  82. } catch (Exception e) {
  83. e.printStackTrace();
  84. } finally {
  85. lock.unlock();
  86. }
  87. }
  88. }

不会出现虚假唤醒,但是 唤醒的没有规律

condition 精准通知和唤醒线程

  1. package com.pc;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. /*
  6. * A执行完调用B
  7. * B执行完调用C
  8. * C执行完调用A
  9. * */
  10. public class C {
  11. public static void main(String[] args) {
  12. Data3 data3 = new Data3();
  13. new Thread(()->{
  14. for (int i = 0; i < 10; i++) {
  15. data3.printA();
  16. }
  17. },"A").start();
  18. new Thread(()->{
  19. for (int i = 0; i < 10; i++) {
  20. data3.printB();
  21. }
  22. },"B").start();
  23. new Thread(()->{
  24. for (int i = 0; i < 10; i++) {
  25. data3.printC();
  26. }
  27. },"C").start();
  28. }
  29. }
  30. class Data3{
  31. private Lock lock = new ReentrantLock();
  32. private Condition condition1 = lock.newCondition();
  33. private Condition condition2 = lock.newCondition();
  34. private Condition condition3 = lock.newCondition();
  35. private int number = 1;
  36. public void printA(){
  37. lock.lock();
  38. try {
  39. while (number != 1) {
  40. // 等待
  41. condition1.await();
  42. }
  43. System.out.println(Thread.currentThread().getName() + "--->A");
  44. // 唤醒指定的人
  45. // 业务代码 判断是否等待 执行 通知
  46. number = number+1;
  47. condition2.signal();
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. } finally {
  51. lock.unlock();
  52. }
  53. }
  54. public void printB(){
  55. lock.lock();
  56. try {
  57. // 业务代码 判断是否等待 执行 通知
  58. while (number != 2) {
  59. condition2.await();
  60. }
  61. System.out.println(Thread.currentThread().getName() + "--->B");
  62. number = 3;
  63. condition3.signal();
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. } finally {
  67. lock.unlock();
  68. }
  69. }
  70. public void printC(){
  71. lock.lock();
  72. try {
  73. while (number != 3) {
  74. condition3.await();
  75. }
  76. System.out.println(Thread.currentThread().getName() + "--->C");
  77. number = 1;
  78. condition1.signal();
  79. // 业务代码 判断是否等待 执行 通知
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. } finally {
  83. lock.unlock();
  84. }
  85. }
  86. }

8锁现象

怎么判断锁的是谁

8锁 测试代码

new this 具体的都关系

static 锁的是Class

集合类不安全

List - CopyOnWriteArrayList

  1. package com.unsafe;
  2. import java.util.*;
  3. import java.util.concurrent.CopyOnWriteArrayList;
  4. /*
  5. * java.util.ConcurrentModificationException 并发修改异常
  6. * */
  7. public class ListTest {
  8. public static void main(String[] args) {
  9. // List<String> list = Arrays.asList("1", "12", "13");
  10. // list.forEach(System.out::println);
  11. // 并发下 ArrayList 不安全
  12. /*
  13. * 解决方法:
  14. * 1. List<String> list = new Vector<>();
  15. * 2. List<String> list = Collections.synchronizedList(new ArrayList<>());
  16. * 3. List<String> list = new CopyOnWriteArrayList<>();
  17. * */
  18. // CopyOnWrite写入是复制 多个线程调用的时候 读取的是固定的,写入避免覆盖,写入的时候先copyof一份 ,最后在set回去
  19. // CopyOnWriteArrayList 比 Vector 厉害在哪
  20. List<String> list = new CopyOnWriteArrayList<>();
  21. for (int i = 1; i <= 10; i++) {
  22. new Thread(() -> {
  23. list.add(UUID.randomUUID().toString().substring(0, 5));
  24. System.out.println(list);
  25. }, String.valueOf(i)).start();
  26. }
  27. }
  28. }

set - ConcurrentSkipListSet

  1. package com.unsafe;
  2. import java.util.Collections;
  3. import java.util.HashSet;
  4. import java.util.Set;
  5. import java.util.UUID;
  6. import java.util.concurrent.ConcurrentSkipListSet;
  7. /*
  8. *解决方案:
  9. * 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
  10. * 2.Set<String> set = new ConcurrentSkipListSet<>();
  11. * */
  12. public class SetTest {
  13. public static void main(String[] args) {
  14. // Set<String> set = new ConcurrentSkipListSet<>();
  15. HashSet<Object> set = new HashSet<>();
  16. for (int i = 0; i < 30; i++) {
  17. new Thread(()->{
  18. set.add(UUID.randomUUID().toString().substring(0, 5));
  19. System.out.println(set);
  20. },String.valueOf(i)).start();
  21. }
  22. }
  23. }

hashSet底层是啥??

hashSet底层new 了一个HashMap

 add方法   也是map.put()

map - ConcurrentHashMap

  1. package com.unsafe;
  2. import java.util.Collections;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.UUID;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. /*
  8. * java.util.ConcurrentModificationException
  9. * */
  10. public class MapTest {
  11. public static void main(String[] args) {
  12. // Map<String, String> map = new HashMap<>()
  13. Map<Object, Object> map = Collections.synchronizedMap(new HashMap<>());
  14. // ConcurrentHashMap不能接受null的key和null的value,会抛出空指针异常
  15. // Map<String, String> map = new ConcurrentHashMap<>();
  16. for (int i = 1; i <= 30; i++) {
  17. new Thread(()->{
  18. map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
  19. System.out.println(map);
  20. },String.valueOf(i)).start();
  21. }
  22. }
  23. }

Callable

  1. 可以有返回值

  2. 可以抛出异常

  3. 方法不同 run() call()

 

 

  1. package com.callable;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;
  5. public class CallableTest {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. // new Thread(new Runnable()).start();
  8. // new Thread(new FutureTask<V>() ).start();
  9. // new Thread(new FutureTask<V>(Callable) ).start();
  10. new Thread().start(); // 怎么启动Callable
  11. MyThread myThread = new MyThread();
  12. FutureTask futureTask = new FutureTask(myThread);// 适配类
  13. new Thread(futureTask,"A").start();
  14. new Thread(futureTask,"B").start();
  15. // 两个线程,但只打印了一份 由于JVM第二次再调用FutrueTask对象所持有的线程, 此时FutrueTask的state此时已非NEW状态
  16. // 则此时会直接结束对应线程,就会导致任务也不执行,只是在第一次调用时返回结果保存了
  17. // 只执行一次,判断不是第一次就直接return了
  18. String o = (String)futureTask.get(); //获取Callable返回结果 get 方法可能会产生阻塞 或者使用异步通信处理
  19. System.out.println(o);
  20. }
  21. }
  22. class MyThread implements Callable<String>{ // Callable的泛型是啥返回值就是啥
  23. @Override
  24. public String call() {
  25. System.out.println("call");
  26. return "999";
  27. }
  28. }

常用的辅助类

CountDownLatch

 

  1. package com.add;
  2. import java.util.concurrent.CountDownLatch;
  3. //计数器
  4. public class CoundDownLatchDemo {
  5. public static void main(String[] args) throws InterruptedException {
  6. // 倒计时,总数是6
  7. CountDownLatch countDownLatch = new CountDownLatch(6);
  8. for (int i = 0; i < 6; i++) {
  9. new Thread(() -> {
  10. System.out.println(Thread.currentThread().getName() + "走了");
  11. countDownLatch.countDown(); // 数量减一
  12. },String.valueOf(i)).start();
  13. }
  14. countDownLatch.await();//等待计数器归零,然后向下执行
  15. System.out.println("关门");
  16. }
  17. }

原理:

countDownLatch.countDown(); // 数量减一

countDownLatch.await();//等待计数器归零

每次调用countDown() 数量减一 , 假设计数器变为0 countDownLatch.await() 就会被唤醒 ,继续执行

CyclicBarrier

  1. package com.add;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. //CyclicBarrier 执行达到指定线程数再执行操作
  5. //集齐7个葫芦娃合体
  6. public class CyclicBarrierDemo {
  7. public static void main(String[] args) {
  8. // 呼叫葫芦娃的线程
  9. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
  10. System.out.println("葫芦娃合体");
  11. });
  12. for (int i = 1; i <= 7; i++) {
  13. // lambda拿不到i 设置中间变量
  14. final int temp = i;
  15. new Thread(() -> {
  16. System.out.println(Thread.currentThread().getName()+"召集了" + temp + "个葫芦娃");
  17. try {
  18. cyclicBarrier.await(); //等待
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. } catch (BrokenBarrierException e) {
  22. e.printStackTrace();
  23. }
  24. }).start();
  25. }
  26. }
  27. }

Semaphore

  1. package com.add;
  2. import java.util.concurrent.Semaphore;
  3. import java.util.concurrent.TimeUnit;
  4. //CyclicBarrier : 指定个数线程执行完毕再执行操作
  5. //CyclicBarrier 执行达到指定线程数再执行操作
  6. public class SemaohoreDemo {
  7. public static void main(String[] args) {
  8. // 线程数量 : 停车位 限流的时候可以用
  9. Semaphore semaphore = new Semaphore(3);
  10. for (int i = 1; i < 6; i++) {
  11. new Thread(() -> {
  12. try {
  13. semaphore.acquire(); // 得到
  14. System.out.println(Thread.currentThread().getName() + " 得到车位");
  15. TimeUnit.SECONDS.sleep(2);
  16. System.out.println(Thread.currentThread().getName() + " 离开车位");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }finally {
  20. semaphore.release(); // 释放
  21. }
  22. },String.valueOf(i)).start();
  23. }
  24. }
  25. }

semaphore.acquire(); 获得 ,假设已经满了,等待, 知道被释放为止

semaphore.release(); 释放 会将当前信号量释放 +1 , 然后唤醒等待的线程

作业 : 多个共享资源互斥使用 ; 并发限流(控制最大线程数)

读写锁

ReadWriteLock

  1. package com.rw;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.locks.ReadWriteLock;
  5. import java.util.concurrent.locks.ReentrantReadWriteLock;
  6. /*
  7. * ReadWriteLock
  8. * 独占锁(写锁) 一次只能被一个线程占有
  9. * 共享锁(读锁) 多个线程可以同时操作
  10. * 读 - 读 可以共存
  11. * 读 - 写 不能共存
  12. * 写 - 学 不能共存
  13. * */
  14. public class ReadWriteLockDemo {
  15. public static void main(String[] args) {
  16. MyCacheLock myCache = new MyCacheLock();
  17. for (int i = 1; i <= 5; i++) {
  18. final int temp = i;
  19. // 写入
  20. new Thread(() -> {
  21. myCache.put(temp+"",temp+"");
  22. },String.valueOf(i)).start();
  23. }
  24. for (int i = 1; i <= 5; i++) {
  25. final int temp = i;
  26. // 读取
  27. new Thread(() -> {
  28. myCache.get(temp+"");
  29. },String.valueOf(i)).start();
  30. }
  31. }
  32. }
  33. //自定义缓存
  34. // 会出现 写操作的时候被别的线程插队 , 所以引出ReadWriteLock
  35. class MyCache{
  36. private volatile Map<String, Object> map = new HashMap<>();
  37. // 存 写
  38. public void put(String key , Object value) {
  39. System.out.println(Thread.currentThread().getName() + "写入" + key);
  40. map.put(key, value);
  41. System.out.println(Thread.currentThread().getName() + "写入完成");
  42. }
  43. // 读 取
  44. public void get(String key) {
  45. System.out.println(Thread.currentThread().getName() + "读取" + key);
  46. Object o = map.get(key);
  47. System.out.println(Thread.currentThread().getName() + "读取完成");
  48. }
  49. }
  50. //加锁
  51. class MyCacheLock{
  52. private volatile Map<String, Object> map = new HashMap<>();
  53. //读写锁 :
  54. private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  55. // 存 的时候,希望只有一个线程去写操作
  56. public void put(String key , Object value) {
  57. readWriteLock.writeLock().lock();
  58. try {
  59. System.out.println(Thread.currentThread().getName() + "写入" + key);
  60. map.put(key, value);
  61. System.out.println(Thread.currentThread().getName() + "写入完成");
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. } finally {
  65. readWriteLock.writeLock().unlock();
  66. }
  67. }
  68. // 读的时候,所有人都可以去读
  69. public void get(String key) {
  70. readWriteLock.readLock().lock();
  71. try {
  72. System.out.println(Thread.currentThread().getName() + "读取" + key);
  73. Object o = map.get(key);
  74. System.out.println(Thread.currentThread().getName() + "读取完成");
  75. } catch (Exception e) {
  76. e.printStackTrace();
  77. } finally {
  78. readWriteLock.readLock().unlock();
  79. }
  80. }
  81. }

阻塞队列

阻塞

队列

 

四组API

方式抛出异常有返回值,不抛出异常阻塞 等待超时等待
添加addofferput()offer( , ,)
移除removepolltake()pool( , )
判断队列首elementpeek--
  1. package com.bq;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. public class test {
  4. public static void main(String[] args) {
  5. test1();
  6. }
  7. /*
  8. * 抛出异常
  9. * */
  10. public static void test1(){
  11. // 队列的大小
  12. ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
  13. System.out.println(arrayBlockingQueue.add("a"));
  14. System.out.println(arrayBlockingQueue.add("c"));
  15. System.out.println(arrayBlockingQueue.add("v"));
  16. System.out.println("---------------");
  17. // java.lang.IllegalStateException: Queue full
  18. // System.out.println(arrayBlockingQueue.add("f"));
  19. // System.out.println(arrayBlockingQueue.element()); // 查看队首元素
  20. System.out.println(arrayBlockingQueue.remove());
  21. System.out.println(arrayBlockingQueue.remove());
  22. System.out.println(arrayBlockingQueue.remove());
  23. // java.util.NoSuchElementException
  24. // System.out.println(arrayBlockingQueue.remove());
  25. }
  26. }

  1. /*
  2. * 有返回值,没有异常*/
  3. public static void test2(){
  4. // 队列大小
  5. ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
  6. System.out.println(arrayBlockingQueue.offer("a"));
  7. System.out.println(arrayBlockingQueue.offer("b"));
  8. System.out.println(arrayBlockingQueue.offer("c"));
  9. System.out.println(arrayBlockingQueue.offer("d")); // 返回false 不抛出异常
  10. System.out.println(arrayBlockingQueue.peek()); //检测队首元素
  11. System.out.println(arrayBlockingQueue.poll());
  12. System.out.println(arrayBlockingQueue.poll());
  13. System.out.println(arrayBlockingQueue.poll());
  14. System.out.println(arrayBlockingQueue.poll()); // 返回 null 不抛出异常
  15. }

  1. /*
  2. * 等待 阻塞 (一直阻塞)
  3. * */
  4. public static void test3() throws InterruptedException {
  5. ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
  6. // 一直阻塞
  7. arrayBlockingQueue.put("a");
  8. arrayBlockingQueue.put("b");
  9. arrayBlockingQueue.put("c");
  10. // arrayBlockingQueue.put("d"); // 队列没有位置,一直阻塞
  11. System.out.println(arrayBlockingQueue.take());
  12. System.out.println(arrayBlockingQueue.take());
  13. System.out.println(arrayBlockingQueue.take());
  14. System.out.println(arrayBlockingQueue.take()); // 没有多余元素. 一直等待
  15. }
  1. /*
  2. * 等待 阻塞 (等待超时)
  3. * */
  4. public static void test4() throws InterruptedException {
  5. ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
  6. System.out.println(arrayBlockingQueue.offer("a"));
  7. System.out.println(arrayBlockingQueue.offer("b"));
  8. System.out.println(arrayBlockingQueue.offer("c"));
  9. System.out.println(arrayBlockingQueue.offer("c",2, TimeUnit.SECONDS)); //等待超过两秒直接退出
  10. System.out.println(arrayBlockingQueue.poll());
  11. System.out.println(arrayBlockingQueue.poll());
  12. System.out.println(arrayBlockingQueue.poll());
  13. System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS)); //等待超过两秒直接退出
  14. }

SynchronizedQueue 同步队列

没有容量

存进去一个元素后,必须等到取出来后才可以存入下一个元素

  1. package com.blockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.SynchronousQueue;
  4. import java.util.concurrent.TimeUnit;
  5. /*同步队列
  6. * 和其他的BlockingQueue 不一样 ,SynchronousQueue 不存储元素
  7. * put 一个元素,必须先取出来 否则不能再put进去值 */
  8. public class SynchronizedQueueDemo {
  9. public static void main(String[] args) {
  10. BlockingQueue<String> synchronousQueue = new SynchronousQueue<>(); //同步队列
  11. new Thread(()->{
  12. try {
  13. System.out.println(Thread.currentThread().getName() + "----" + "put 1");
  14. synchronousQueue.put("1");
  15. System.out.println(Thread.currentThread().getName() + "----" + "put 2");
  16. synchronousQueue.put("2");
  17. System.out.println(Thread.currentThread().getName() + "----" + "put 3");
  18. synchronousQueue.put("3");
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. },"A").start();
  23. new Thread(()->{
  24. try {
  25. TimeUnit.SECONDS.sleep(2);
  26. System.out.println(Thread.currentThread().getName() + "----" + synchronousQueue.take());
  27. TimeUnit.SECONDS.sleep(2);
  28. System.out.println(Thread.currentThread().getName() + "----" + synchronousQueue.take());
  29. TimeUnit.SECONDS.sleep(2);
  30. System.out.println(Thread.currentThread().getName() + "----" + synchronousQueue.take());
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. },"B").start();
  35. }
  36. }

线程池

线程池三大方法 ; 七大参数 ; 四种拒绝策略

线程池的好处:

  • 降低资源消耗

  • 提高响应速度

  • 方便管理

线程复用 , 可以控制最大并发,管理线程

池化技术

事先准备好一些资源,有人要用就来拿,用完之后归还

三大方法

  1. package com.pool;
  2. import java.util.concurrent.Executor;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. /*
  7. * Executors 工具类 3大方法
  8. * 使用线程池创建线程
  9. * */
  10. public class Demo1 {
  11. public static void main(String[] args) {
  12. // ExecutorService threadPoolExecutor= Executors.newSingleThreadExecutor();//当个线程
  13. // ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5); //创建一个固定的线程池大小
  14. ExecutorService threadPoolExecutor = Executors.newCachedThreadPool(); //;可伸缩的 最大创建线程
  15. try {
  16. for (int i = 0; i < 10; i++) {
  17. // 使用线程池来创建线程
  18. threadPoolExecutor.execute(() -> {
  19. System.out.println(Thread.currentThread().getName() + " ok");
  20. });
  21. }
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. } finally {
  25. // 线程池使用完毕,程序结束,关闭线程池
  26. threadPoolExecutor.shutdown();
  27. }
  28. }
  29. }

7大参数

  1. public ThreadPoolExecutor(int corePoolSize, // 核心线程大小
  2. int maximumPoolSize, // 最大线程大小
  3. long keepAliveTime, // 超时没人调用就会释放
  4. TimeUnit unit, //超时单位
  5. BlockingQueue<Runnable> workQueue, // 阻塞队列
  6. ThreadFactory threadFactory, // 线程工厂 ,创建线程的 一般不动
  7. RejectedExecutionHandler handler // 拒绝策略 ) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.corePoolSize = corePoolSize;
  16. this.maximumPoolSize = maximumPoolSize;
  17. this.workQueue = workQueue;
  18. this.keepAliveTime = unit.toNanos(keepAliveTime);
  19. this.threadFactory = threadFactory;
  20. this.handler = handler;
  21. }

四种拒绝策略

  1. package com.pool;
  2. import java.util.concurrent.*;
  3. /*
  4. * java.util.concurrent.RejectedExecutionException 超出最大线程, 抛出异常
  5. * ThreadPoolExecutor.CallerRunsPolicy() 哪里来会那里去
  6. * ThreadPoolExecutor.DiscardPolicy() // 队列满了不会抛出异常
  7. * ThreadPoolExecutor.DiscardOldestPolicy() 队列满了尝试和 最早的竞争
  8. * */
  9. public class Demo2 {
  10. public static void main(String[] args) {
  11. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
  12. 5,
  13. 3,
  14. TimeUnit.SECONDS,
  15. new LinkedBlockingDeque<>(3),
  16. Executors.defaultThreadFactory(),
  17. new ThreadPoolExecutor.DiscardOldestPolicy()); // 队列满了尝试和 最早的竞争
  18. try {
  19. // 最大线程数量= max + LinkedBlockingDeque
  20. for (int i = 1; i <= 9; i++) {
  21. // 使用线程池来创建线程
  22. threadPoolExecutor.execute(() -> {
  23. System.out.println(Thread.currentThread().getName() + " ok");
  24. });
  25. }
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. } finally {
  29. // 线程池使用完毕,程序结束,关闭线程池
  30. threadPoolExecutor.shutdown();
  31. }
  32. }
  33. }

IO密集型 CPU密集型

问题: 线程池最大线程怎么设置

  1. // 最大线程如何定义
  2. // 1.CPU 密集型 几核就是几, 可以保持CPU的效率最高
  3. System.out.println(Runtime.getRuntime().availableProcessors()); // 获取CPU核数
  4. // 2. IO 密集型 判断程序中消耗IO的线程,只要大于这些线程即可
  5. // 例 ; 程序中有15个大型任务, io特别占用资源 至少留15个线程处理这些任务

四大函数式接口

lambda表达式 ;链式编程 ;函数式接口 ;Stream流

函数时接口 : 只有一个方法的接口

  1. // 简化编程模型,新版本框架大量使用
  2. // foreach(消费者类的函数式接口)
  3. @FunctionalInterface
  4. public interface Runnable {
  5. public abstract void run();
  6. }

Function函数式接口

  1. package com.Function;
  2. import java.util.function.Function;
  3. /*
  4. * Function 函数式接口, 一个入参 一个输出
  5. * 是要是函数时接口 就可以用lambda表达式简化
  6. * */
  7. public class Demo {
  8. public static void main(String[] args) {
  9. // 输出输入的值
  10. // Function function = new Function<String, String>() {
  11. // @Override
  12. // public String apply(String s) {
  13. // return s;
  14. // }
  15. //
  16. // };
  17. Function<String, String> function = (str) -> {return str; };
  18. System.out.println(function.apply("123asd"));
  19. }
  20. }

Predicate断定型接口

有一个入参 返回值式布尔值

  1. package com.Function;
  2. import java.util.function.Predicate;
  3. /*
  4. * 断定型接口 有一个入参 返回值式布尔值
  5. * */
  6. public class Demo2 {
  7. public static void main(String[] args) {
  8. // 判断字符是否为空
  9. // Predicate<String> predicate = new Predicate<String>(){
  10. // @Override
  11. // public boolean test(String s) {
  12. // return s.isEmpty();
  13. // }
  14. // };
  15. Predicate<String> predicate = (srt) -> { return srt.isEmpty(); };
  16. System.out.println(predicate.test(""));
  17. }
  18. }

Consumer消费型接口

只有入参没有返回值

  1. package com.Function;
  2. import java.util.function.Consumer;
  3. /*
  4. * Consumer 消费型接口 只有输入 ,没有返回值
  5. * */
  6. public class Demo3 {
  7. public static void main(String[] args) {
  8. // Consumer<String> consumer = new Consumer<String>() {
  9. // @Override
  10. // public void accept(String str) {
  11. // System.out.println(str);
  12. // }
  13. // };
  14. // Consumer<String> consumer = (str)->{ System.out.println(str); };
  15. Consumer<String> consumer = System.out::println;
  16. consumer.accept("123qwe");
  17. }
  18. }

Supplier供给型接口

  1. package com.Function;
  2. import java.util.function.Supplier;
  3. /*
  4. * Supplier 没有参数 只有返回值
  5. * */
  6. public class Demo4 {
  7. public static void main(String[] args) {
  8. // Supplier<String> supplier = new Supplier<String>() {
  9. // @Override
  10. // public String get() {
  11. // return "123";
  12. // }
  13. // };
  14. Supplier<String> supplier = ()->{ return "123";};
  15. System.out.println(supplier.get());
  16. }
  17. }

Stream流式计算

集合 MySQL 存储的东西 交给Stream流来计算

  1. package com.stream;
  2. import java.util.Arrays;
  3. import java.util.List;
  4. /*
  5. * 要求筛选:
  6. * id必须式偶数
  7. * age 大于23
  8. * name 转换为大写
  9. * name 倒着排序
  10. * 只输出一个用户
  11. * */
  12. public class Test {
  13. public static void main(String[] args) {
  14. User u1 = new User(1,"a",21);
  15. User u2 = new User(2,"b",22);
  16. User u3 = new User(3,"c",23);
  17. User u4 = new User(4,"d",24);
  18. User u5 = new User(6,"e",25);
  19. List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
  20. list.stream()
  21. .filter(u -> {
  22. return u.getId() % 2 == 0;
  23. }).
  24. filter(u -> {
  25. return u.getAge() > 23;
  26. })
  27. .map(u -> {
  28. return u.getNamen().toUpperCase();
  29. }).sorted((uu1,uu2)->{
  30. return uu1.compareTo(uu2);})
  31. .limit(1)
  32. .forEach(System.out::println);
  33. }
  34. }

ForkJoin

ForkJoin 再jDK1.7之后出的 , 并行执行任务 , 提高效率 处理大数据量时使用

ForkJoin 特点 任务窃取

里面维护的都是双端队列

 

 

代码测试

  1. package com.forkJoin;
  2. import java.util.concurrent.RecursiveTask;
  3. /*
  4. * 使用ForkJoin
  5. * 1. 通过ForkJoinPool执行
  6. * 2. 计算任务 forkJoinPool,execute(ForkJoinTxask task)
  7. * 3. 计算类继承 ForkJoinTxask
  8. * */
  9. public class ForkJoinDemo extends RecursiveTask<Long> {
  10. private long start;
  11. private long end;
  12. // 临界值
  13. private long temp = 10000L;
  14. public ForkJoinDemo(long start, long end) {
  15. this.start = start;
  16. this.end = end;
  17. }
  18. // 计算方法
  19. @Override
  20. protected Long compute() {
  21. if ((end - start) < temp) {
  22. long sum = 0L;
  23. for (long i = start; i <= end; i++) {
  24. sum += i;
  25. }
  26. return sum;
  27. } else {
  28. long midden = (start + end) / 2; // 中间值
  29. ForkJoinDemo task1 = new ForkJoinDemo(start, midden);
  30. task1.fork(); //拆分任务 ,任务压入队列
  31. ForkJoinDemo task2 = new ForkJoinDemo(midden + 1 , end);
  32. task2.fork();//拆分任务 ,任务压入队列
  33. return task1.join() + task2.join();
  34. }
  35. }
  36. }

 测试类

  1. package com.forkJoin;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ForkJoinPool;
  4. import java.util.concurrent.ForkJoinTask;
  5. import java.util.stream.LongStream;
  6. public class Test {
  7. public static void main(String[] args) throws ExecutionException, InterruptedException {
  8. // test1(); //3820 239
  9. // test2(); //1996 121
  10. // test3(); //137 131
  11. }
  12. // 普通计算
  13. public static void test1() {
  14. long sum = 0;
  15. long start = System.currentTimeMillis();
  16. for (long i = 1; i <10_0000_0000 ; i++) {
  17. sum += sum;
  18. }
  19. long end = System.currentTimeMillis();
  20. System.out.println("时间:" + (end - start));
  21. }
  22. // 使用ForkJoin
  23. public static void test2() throws ExecutionException, InterruptedException {
  24. long start = System.currentTimeMillis();
  25. ForkJoinPool forkJoinPool = new ForkJoinPool();
  26. ForkJoinTask<Long> joinDemo1 = new ForkJoinDemo(0,10_0000_0000);
  27. ForkJoinTask<Long> submit = forkJoinPool.submit(joinDemo1);//提交任务
  28. Long sum = submit.get();
  29. long end = System.currentTimeMillis();
  30. System.out.println("sum:" + sum + "时间:" + (end - start));
  31. }
  32. // 使用Stream并行流
  33. public static void test3(){
  34. long start = System.currentTimeMillis();
  35. // Stream并行流
  36. long sum = LongStream.rangeClosed(0L, 10_0000_0000).parallel().reduce(0L, Long::sum);
  37. long end = System.currentTimeMillis();
  38. System.out.println("sum:" + sum + "时间:" + (end - start));
  39. }
  40. }

异步回调

对未来的某个事件结果进行建模

  1. package com.future;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.TimeUnit;
  5. /*
  6. * 异步调用
  7. * 异步执行
  8. * 成功回调
  9. * 失败回调
  10. * */
  11. public class Demo1 {
  12. public static void main(String[] args) throws ExecutionException, InterruptedException {
  13. // 没有返回值的 runAsync 异步回调
  14. /* CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
  15. try {
  16. TimeUnit.SECONDS.sleep(2);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. System.out.println(Thread.currentThread().getName()+"runAsync--->Void");
  21. });
  22. System.out.println("1111");
  23. completableFuture.get();//阻塞获取执行结果
  24. */
  25. // 有返回值的异步回调
  26. CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
  27. System.out.println(Thread.currentThread().getName() + "---->supplyAsync");
  28. int i = 10 / 0;
  29. return 111;
  30. });
  31. System.out.println(completableFuture.whenComplete((u1, u2) -> {
  32. System.out.println("t: " + u1); // 正常返回结果java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
  33. System.out.println("u: " + u2);
  34. }).exceptionally((e) -> {
  35. System.out.println(e.getMessage());
  36. return 999; //可以获取到错误的返回结果
  37. }).get());
  38. }
  39. }

JMM

Volatile是java虚拟机提供的 轻量级同步机制

  1. 保证可见性

  2. 不保证原子性

  3. 禁止指令重排

Volatile 可见性怎么保证?? 那就要说JMM了

JMM: java内存模型 , 是不存在的东西,是一种约定

关于JMM的约定:

1.线程解锁前 ,必须把共享变量立刻刷回主存

2.线程加锁前,必须读取主存中的最新值到内存中

3.加锁和解锁必须是同一把锁

线程 : 工作内存 主内存

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

    • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态

    • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定

    • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用

    • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中

    • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令

    • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中

    • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用

    • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

  JMM对这八种指令的使用,制定了如下规则:

    • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write

    • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存

    • 不允许一个线程将没有assign的数据从工作内存同步回主内存

    • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作

    • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁

    • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值

    • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量

    • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

问题: 程序不知道主内存已经被修改过了

Volatile

1.Volatile保证可见性

  1. package com.testVolatile;
  2. import java.util.concurrent.TimeUnit;
  3. public class JMMDemo {
  4. // 不加volatile 程序就会死循环
  5. // 加上volatile 就能保证程序的可见性
  6. private volatile static int num = 0;
  7. public static void main(String[] args) {
  8. new Thread(() -> { //线程1 对主内存的变化不知道
  9. while (num == 0) {
  10. }
  11. }).start();
  12. try {
  13. TimeUnit.SECONDS.sleep(2);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. num = 1;
  18. System.out.println(num);
  19. }
  20. }

2.Volatile不保证原子性

原子性: 不可分割

线程A再执行任务的时候,不能被打扰,不能被分割,要么同时成功,要么同时失败

  1. package com.testVolatile;
  2. //不保证原子性
  3. public class VolatileDemo {
  4. // volatile 不保证原子性
  5. private volatile static int num = 0;
  6. public static void add(){
  7. num++;
  8. }
  9. public static void main(String[] args) {
  10. // 理论上num结果为20000
  11. for (int i = 0; i < 20; i++) {
  12. new Thread(()->{
  13. for (int i1 = 0; i1 < 1000; i1++) {
  14. add();
  15. }
  16. }).start();
  17. }
  18. while (Thread.activeCount() > 2) { //默认两个线程开启 main gc
  19. Thread.yield();
  20. }
  21. System.out.println(Thread.currentThread().getName() + " " + num);
  22. }
  23. }

 如果不加 lock和synchronized 怎么保证原子性

 

使用原子类保证原子性

  1. package com.testVolatile;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. //不保证原子性
  4. public class VolatileDemo {
  5. // volatile 不保证原子性
  6. // 原子类的Integer
  7. private volatile static AtomicInteger num = new AtomicInteger();
  8. public static void add(){
  9. // num++; // 不是原子性操作
  10. num.getAndIncrement(); // CAS操作
  11. }
  12. public static void main(String[] args) {
  13. // 理论上num结果为20000
  14. for (int i = 0; i < 20; i++) {
  15. new Thread(()->{
  16. for (int i1 = 0; i1 < 1000; i1++) {
  17. add();
  18. }
  19. }).start();
  20. }
  21. while (Thread.activeCount() > 2) { //默认两个线程开启 main gc
  22. Thread.yield();
  23. }
  24. System.out.println(Thread.currentThread().getName() + " " + num);
  25. }
  26. }

这些类的底层直接和操作系统挂钩,再内存中修改,Unsafe是一个很特殊的存在!

Volatile指令重排

指令重排: 你写的程序,计算机并不是按照你写的那样去执行的

源代码--> 编译器优化的重排--> 指令并行也可能回导致重排--> 内存系统也会重排--> 执行

处理器在进行指令重排的时候,会考虑数据之间的依赖性

内存屏障 CPU指令 作用:

保证特定的操作的执行顺序

可以保证某些变量的内存可见性(利用这些特性 Volatile实现了可见性)

 

单例模式

饿汉式 DCL懒汉式

饿汉式:

  1. package com.signal;
  2. //饿汉式
  3. public class Hungry {
  4. // 类加载的时候就将数组加载到内存,如果不使用,造成内存空间浪费
  5. private byte[] data1 = new byte[1024 * 1024];
  6. private byte[] data2 = new byte[1024 * 1024];
  7. private byte[] data3 = new byte[1024 * 1024];
  8. private byte[] data4 = new byte[1024 * 1024];
  9. private Hungry(){
  10. }
  11. private final static Hungry HUNGRY = new Hungry();
  12. public static Hungry getInstance() {
  13. return HUNGRY;
  14. }
  15. }

懒汉式:

  1. package com.signal;
  2. import java.lang.reflect.Constructor;
  3. import java.lang.reflect.Field;
  4. //懒汉式单例
  5. public class LazyMan {
  6. private static boolean aaaa = false;
  7. private LazyMan(){
  8. synchronized (LazyMan.class) {
  9. if (aaaa == false) {
  10. aaaa = true;
  11. } else {
  12. throw new RuntimeException("反射获取结果");
  13. }
  14. }
  15. System.out.println(Thread.currentThread().getName()+" o k ");
  16. }
  17. // 加上volatile 避免 指令重排
  18. private volatile static LazyMan lazyMan;
  19. // 双重检查锁模式的 懒汉式 , 简称DCL懒汉式
  20. public static LazyMan getInstance(){
  21. if (lazyMan == null) {
  22. synchronized (LazyMan.class) {
  23. if (lazyMan == null) {
  24. lazyMan = new LazyMan(); //不是原子性操作
  25. /*
  26. * 1. 分配内存空间
  27. * 2.执行构造方法初始化对象
  28. * 3.把对象指向这个空间
  29. *
  30. * */
  31. }
  32. }
  33. }
  34. return lazyMan;
  35. }
  36. 多线程并发 加上volatile 后避免指令重排
  37. //public static void main(String[] args) {
  38. // for (int i = 0; i < 10; i++) {
  39. // new Thread(() -> {
  40. // LazyMan.getInstance();
  41. // }).start();
  42. //
  43. //
  44. // }
  45. //}
  46. // 反射
  47. public static void main(String[] args) throws Exception {
  48. // LazyMan instance = LazyMan.getInstance();
  49. Field aaaa = LazyMan.class.getDeclaredField("aaaa");
  50. aaaa.setAccessible(true);
  51. Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
  52. declaredConstructor.setAccessible(true);
  53. LazyMan instance1 = declaredConstructor.newInstance();
  54. aaaa.set(instance1, false);
  55. LazyMan instance2= declaredConstructor.newInstance();
  56. // System.out.println(instance);
  57. System.out.println(instance1);
  58. System.out.println(instance2);
  59. }
  60. }

静态内部类:

  1. package com.signal;
  2. //静态内部类
  3. public class Holder {
  4. private Holder(){
  5. }
  6. public static Holder getInstance() {
  7. return InnerClass.HOLDER;
  8. }
  9. public static class InnerClass{
  10. private static final Holder HOLDER = new Holder();
  11. }
  12. }

枚举

  1. package com.signal;
  2. import java.lang.reflect.Constructor;
  3. //enum本身是一个Class类
  4. public enum EnumSingle {
  5. INSTANCE;
  6. public EnumSingle getInstamce() {
  7. return INSTANCE;
  8. }
  9. }
  10. class Test{
  11. public static void main(String[] args) throws Exception {
  12. EnumSingle instance1 = EnumSingle.INSTANCE;
  13. Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);
  14. declaredConstructor.setAccessible(true);
  15. EnumSingle instance2 = declaredConstructor.newInstance();
  16. System.out.println(instance1);
  17. System.out.println(instance2);
  18. }
  19. }

深入理解CAS

cas: 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么就执行操作,如果不是就一直循环

缺点:

1.由于底层是自旋锁,循环回浪费时间

由于是底层CPU操作,一次不保证一个共享变量的原子性

  1. package com.cas;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class CASDemo {
  4. // CompareAndSwap : 比较并交换
  5. public static void main(String[] args) {
  6. AtomicInteger atomicInteger = new AtomicInteger(2020);
  7. // 如果期望值达到了 就更新,否则不更新 CAS是CPU的并发原语
  8. System.out.println(atomicInteger.compareAndSet(2020, 2021));
  9. System.out.println(atomicInteger.get());
  10. atomicInteger.getAndIncrement();
  11. System.out.println(atomicInteger.compareAndSet(2020, 2021));
  12. System.out.println(atomicInteger.get());
  13. }
  14. }

unsafe 类

 

ABA问题:

  1. package com.cas;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class CASDemo {
  4. // CompareAndSwap : 比较并交换
  5. public static void main(String[] args) {
  6. AtomicInteger atomicInteger = new AtomicInteger(2020);
  7. // 如果期望值达到了 就更新,否则不更新 CAS是CPU的并发原语
  8. // 捣乱线程
  9. System.out.println(atomicInteger.compareAndSet(2020, 2021));
  10. System.out.println(atomicInteger.get());
  11. // System.out.println(atomicInteger.getAndIncrement()+" 11111");
  12. System.out.println(atomicInteger.compareAndSet(2021, 2020));
  13. System.out.println(atomicInteger.get());
  14. // 期望线程
  15. System.out.println(atomicInteger.compareAndSet(2020, 6666));
  16. System.out.println(atomicInteger.get());
  17. }
  18. }

原子操作

带版本号的原子操作 解决ABA问题 ,引入原子引用 对应的思想: 乐观锁

  1. package com.cas;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.atomic.AtomicStampedReference;
  4. /*integer默认缓存-128->127,超过这个范围就要new对象了,就会分配新的地址,我们看到源码是==,非数值类型,我们比较的是对象的地址
  5. 我想看怎么找到问题所在的片段
  6. 1. 看compareAndSet的源码,里面是使用 == 进行比较的。
  7. 2. 由于new的时候声明泛型肯定是装箱类,这个时候传入值类型将会自动装箱
  8. 3. 自动装箱的后果就是地址不一致,使用==判断的结果就为false\
  9. 4. 总结:最好不使用原子类型,使用原子类型得保证比较时候传入的为同一个装箱类
  10. * */
  11. public class CASDemo2 {
  12. // CompareAndSwap : 比较并交换
  13. public static void main(String[] args) {
  14. // 正常情况下泛型中写的都是对象
  15. AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1, 1);
  16. new Thread(() -> {
  17. int stamp = atomicInteger.getStamp(); //获得版本号
  18. System.out.println("A--->" + stamp);
  19. try {
  20. TimeUnit.SECONDS.sleep(2);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(atomicInteger.compareAndSet(1, 2,
  25. atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
  26. System.out.println("A2--->" + atomicInteger.getStamp());
  27. System.out.println(atomicInteger.compareAndSet(2, 1,
  28. atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
  29. System.out.println("A3--->" + atomicInteger.getStamp());
  30. }, "A").start();
  31. new Thread(() -> {
  32. int stamp = atomicInteger.getStamp(); //获得版本号
  33. System.out.println("B1--->" + stamp);
  34. try {
  35. TimeUnit.SECONDS.sleep(2);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. System.out.println(atomicInteger.compareAndSet(1, 5, stamp, stamp + 1));
  40. System.out.println("B2--->" + atomicInteger.getStamp());
  41. }, "B").start();
  42. }
  43. }

integer默认缓存-128->127,超过这个范围就要new对象了,就会分配新的地址,我们看到源码是==,非数值类型,我们比较的是对象的地址

1. 看compareAndSet的源码,里面是使用 == 进行比较的。
2. 由于new的时候声明泛型肯定是装箱类,这个时候传入值类型将会自动装箱
3. 自动装箱的后果就是地址不一致,使用==判断的结果就为false\
4. 总结:最好不使用原子类型,使用原子类型得保证比较时候传入的为同一个装箱类

 

可重入锁

也叫递归锁

synchronized 锁

  1. package com.lock;
  2. //synchronized
  3. public class Demo1 {
  4. public static void main(String[] args) {
  5. Phone phone = new Phone();
  6. new Thread(() -> {
  7. phone.sms();
  8. }, "A").start();
  9. new Thread(() -> {
  10. phone.sms();
  11. }, "B").start();
  12. }
  13. }
  14. class Phone {
  15. public synchronized void sms() {
  16. System.out.println(Thread.currentThread().getName() + " sms");
  17. call();
  18. }
  19. public synchronized void call() {
  20. System.out.println(Thread.currentThread().getName() + " call");
  21. }
  22. }

lock版

  1. package com.lock;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class Demo2 {
  5. public static void main(String[] args) {
  6. Phone2 phone = new Phone2();
  7. new Thread(() -> {
  8. phone.sms();
  9. }, "A").start();
  10. new Thread(() -> {
  11. phone.sms();
  12. }, "B").start();
  13. }
  14. }
  15. //可重入锁也就是某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
  16. class Phone2 {
  17. Lock lock = new ReentrantLock();
  18. public void sms() {
  19. lock.lock(); // lock.lock() ,锁必须配对,否则回出现死锁
  20. lock.lock();
  21. try {
  22. System.out.println(Thread.currentThread().getName() + " sms");
  23. call();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. lock.unlock();
  29. }
  30. }
  31. public void call() {
  32. lock.lock();
  33. try {
  34. System.out.println(Thread.currentThread().getName() + " call");
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. } finally {
  38. lock.unlock();
  39. }
  40. }
  41. }

自旋锁

自定义自旋锁

  1. package com.lock;
  2. import java.util.concurrent.atomic.AtomicReference;
  3. /*
  4. * 自旋锁
  5. * */
  6. public class SpinLockDemo {
  7. AtomicReference<Thread> atomicReference = new AtomicReference<>();
  8. // 加锁
  9. public void MyLock() {
  10. Thread thread = Thread.currentThread();
  11. System.out.println(thread.getName() + " MyLock");
  12. // 自旋锁
  13. while (!atomicReference.compareAndSet(null, thread)) {
  14. }
  15. }
  16. // 解锁
  17. public void MyUnLock(){
  18. Thread thread = Thread.currentThread();
  19. System.out.println(thread.getName() + " MyUnLock");
  20. atomicReference.compareAndSet(thread, null);
  21. }
  22. }

代码测试

  1. package com.lock;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class TestSpinLock {
  5. public static void main(String[] args) throws InterruptedException {
  6. ReentrantLock reentrantLock = new ReentrantLock();
  7. reentrantLock.lock();
  8. reentrantLock.unlock();
  9. // 底层使用的自旋锁CAS
  10. SpinLockDemo lock = new SpinLockDemo();
  11. new Thread(() -> {
  12. lock.MyLock();
  13. try {
  14. TimeUnit.SECONDS.sleep(3);
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. } finally {
  18. lock.MyUnLock();
  19. }
  20. }, "T1").start();
  21. TimeUnit.SECONDS.sleep(2);
  22. new Thread(() -> {
  23. lock.MyLock();
  24. try {
  25. TimeUnit.SECONDS.sleep(1);
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. } finally {
  29. lock.MyUnLock();
  30. }
  31. }, "T2").start();
  32. }
  33. }

 T1进来是拿到了锁, 然后期望值null, 变成了Thread, 此时T1没有自旋, 而是跳出了循环,T2此时在无限循环, 过了等待时间, T1解锁, 变回了null, T2才能跳出循环

死锁

多个线程各自占有一些共享资源,并且相互等待其他线程占有的资源才能运行,而导致两个或多个线程都在等待对方释放资源,都停止执行的场景,某一个同步块同时拥有“两个以上对象的锁”时,就可能发生“死锁”的问题。

  1. package com.lock;
  2. import java.util.concurrent.TimeUnit;
  3. public class DeadLockDemo {
  4. public static void main(String[] args) {
  5. String lockA = "lockA";
  6. String lockB = "lockB";
  7. new Thread(new MyThread(lockA,lockB),"A").start();
  8. new Thread(new MyThread(lockB,lockA),"B").start();
  9. }
  10. }
  11. class MyThread implements Runnable {
  12. private String lockA;
  13. private String lockB;
  14. public MyThread(String lockA, String lockB) {
  15. this.lockA = lockA;
  16. this.lockB = lockB;
  17. }
  18. @Override
  19. public void run() {
  20. synchronized (lockA) {
  21. System.out.println(Thread.currentThread().getName() + "lock: " + lockA + " ->get" + lockB);
  22. try {
  23. TimeUnit.SECONDS.sleep(2);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. synchronized (lockB) {
  28. System.out.println(Thread.currentThread().getName() + "lock: " + lockB + " ->get" + lockA);
  29. }
  30. }
  31. }
  32. }

死锁解决方案:

产生死锁的四个必要条件:

  • 互斥条件:一个资源每次只能被一个进程使用
  • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放
  • 不剥夺条件:进程已获得的资源,在未使用之前,不能强行剥夺
  • 循环等待条件:若干进程之间行程一种头尾相接的循环等待资源关系

 破坏上述4个条件任意一个或多个条件就会避免死锁的发生

jps -l 使用 jps 定位进程号

如果命令不识别可以试试这个:

找到 C:\Users{用户名}\AppData\Local\Temp\hsperfdata_{用户名} 右键属性->安全->添加用户,添加自己用户名,并且打开所有权限

jstack 70680 对应进程号找问题

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/545645
推荐阅读
相关标签