赞
踩
juc 是java的一个工具包, 位于 Java.util.concurrent。是java处理高并发编程的一个工具包
普通业务代码实现多线程的方式
1、直接继承 Thread类
2、实现Runable接口:此方式下的多线程没有返回值
3、实现Callable接口:Callable接口的多线程能够拥有返回值
2、线程和进程
进程
进程是CPU分配资源的最小单位, 比如一个QQ进程、一个Mysql进程, 进程是线程的集合。java中默认包含两个进程:main、GC
线程
线程是操作系统执行任务的最小单位, 比如我们打开一个word, 在编写文档的时候会自动保存、检查拼写,这里面的检查拼写、自动保存是线程, 打开一个word是进程。
java能够打开线程吗?
java是不能够打开进程的, 线程开启调用的是本地方法, 也就是调用的c++!我们可以清楚的在下面的源码中看到 java 调用了 native 本地方法。
public synchronized void start() { if (this.threadStatus != 0) { throw new IllegalThreadStateException(); } else { this.group.add(this); boolean started = false; try { this.start0(); started = true; } finally { try { if (!started) { this.group.threadStartFailed(this); } } catch (Throwable var8) { } } } } // 底层的方法 private native void start0();
3、并行、并发
并行
并发
4、线程的状态
5、wait()和sleep()的区别
一、来自不同的类:
wait来自 object类。
sleep来自Thread类。
二、关于锁的释放
wait会释放锁、sleep不会释放锁, 会一直持有该锁。
三、使用的范围
wait只能在同步代码块中来使用、sleep可以在任何的地方来执行。
四、是否需要捕获异常
sleep需要捕获异常、wait不需要。
Synchronized
使用 Synchronized 关键字来修饰我们的资源, 这样的话, 当多个线程来操作资源的时候, 线程就会逐个排队,来避免出现并发而导致出错。它的本质就是队列。 使用Synchronized关键字可以获得该对象的锁, 并且在该方法执行结束之前会一直持有该锁, 限制其他线程来访问该资源。
// 买票的例子, synchronized public class SynchronizedDemo01 { // 定义资源类 public static void main(String[] args) { tick tick = new tick(); // 使用 lambda 表达式来开启多线程 new Thread(() -> { for (int i = 0; i < 5; i++) tick.sale(); }, "A").start(); new Thread(() -> { for (int i = 0; i < 5; i++) tick.sale(); }, "B").start(); new Thread(() -> { for (int i = 0; i < 15; i++) tick.sale(); }, "C").start(); } } class tick { private Integer number = 20; public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票, 剩余: " + number); } } }
Lock 接口
在Lock的实现类, ReentrantLock锁中其实是创建了一个不公平锁, 可以在构造器中传入参数来选择。
公平锁:按照先来后台的原则、不能插队
非公平锁:能插队(默认)
public ReentrantLock() {
this.sync = new ReentrantLock.NonfairSync();
}
public ReentrantLock(boolean fair) {
this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
}
public class Lockdemo01 { // 定义资源类, 把资源类丢入线程中去 public static void main(String[] args) { tick2 tick = new tick2(); // 使用 lambda 表达式来开启多线程 new Thread(() -> { for (int i = 0; i < 5; i++) tick.sale(); }, "A").start(); new Thread(() -> { for (int i = 0; i < 5; i++) tick.sale(); }, "B").start(); new Thread(() -> { for (int i = 0; i < 5; i++) tick.sale(); }, "C").start(); } } /** * 使用lock * 1、创建它的实现类 * 2、显示的加锁 * 3、显示的解锁 */ class tick2 { private Integer number = 20; // 创建锁 Lock lock = new ReentrantLock(); public void sale() { // 显示的上锁 lock.lock(); try { // 业务的代码 if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票, 剩余: " + number); } } catch (Exception e) { e.printStackTrace(); } finally { // 解锁 lock.unlock(); } } }
Synchronized 和Lock的区别
1、Synchronized 是java的一个关键字。 Lock是java的一个类
2、Synchronized 是基于队列实现的, 会产生阻塞的风险,线程A 阻塞的话, 后面的线程就会一直傻傻的等着, 而Lock就不一定,本身的效率没有Lock高。
3、Synchronized 无法获得锁的状态, 并且会自动的释放锁, Lock必须手动的加锁,解锁
4、Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以 判断锁,非公平(可以 自己设置);
5、Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码
中断锁
Synchronized是不可中断锁,LOCK为可中断锁
如果线程A正在执行锁中的代码,线程B在等待获取该对象锁,如果等待时间过长,想让B去做其他事情,可以让他自己中断或者在别的线程中断它
Synchronized版本
当只有两个线程的时候, 是没有问题的, 当时线程数量变多后, 就会出现问题
// 示意生产者、消费者问题 public class demo01 { public static void main(String[] args) { data data = new data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } class data { private Integer number = 0; public synchronized void increment() throws InterruptedException { if (number != 0) { this.wait(); // 等待 } number++; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { if (number == 0) { this.wait(); // 等待 } number--; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 this.notifyAll(); } }
使用while 替代 if
// 示意生产者、消费者问题 public class demo02 { public static void main(String[] args) { data1 data = new data1(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } class data1 { private Integer number = 0; public synchronized void increment() throws InterruptedException { while (number != 0) { this.wait(); // 等待 } number++; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { while (number == 0) { this.wait(); // 等待 } number--; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 this.notifyAll(); } }
查看API我们可以得到这个接口, 他能够绑定到一个指定的锁上面
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wYlaatgu-1589549823978)(1588165708344.png)]
使用JUC来解决
// juc 来解决问题 public class demo03 { public static void main(String[] args) { data3 data3 = new data3(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data3.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data3.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data3.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data3.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } class data3 { private Integer number = 0; // 创建一个锁 Lock lock = new ReentrantLock(); // 创建一个 Condition 对象来绑定到一个锁上面 Condition condition = lock.newCondition(); public void increment() throws InterruptedException { lock.lock(); try { while (number != 0) { condition.await(); // 等待 } number++; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try { while (number == 0) { condition.await(); // 等待 } number--; System.out.println(Thread.currentThread().getName() + "->" + number); // 通知 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
唤醒指定的线程
// 精准唤醒线程 public class demo04 { public static void main(String[] args) { data4 data = new data4(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.printA(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.printB(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.printC(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); } } // A 执行完调用B,B执行完调用C,C执行完调用A class data4 { // 创建一个锁 Lock lock = new ReentrantLock(); // 创建一个 Condition 对象来绑定到一个锁上面 private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); // 1 = a , 2 = b , 3 = c int number = 1; public void printA() throws InterruptedException { lock.lock(); try { while (number != 1) { condition1.await(); // 等待 } number = 2; System.out.println(Thread.currentThread().getName() + "->" + "AAAA"); // 通知 condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() throws InterruptedException { lock.lock(); try { while (number != 2) { condition2.await(); // 等待 } number = 3; System.out.println(Thread.currentThread().getName() + "->" + "BBBBB"); // 通知 condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() throws InterruptedException { lock.lock(); try { while (number != 3) { condition3.await(); // 等待 } number = 1; System.out.println(Thread.currentThread().getName() + "->" + "BBBBB"); // 通知 condition1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
1、标准情况下
2、添加四秒的延迟之后
3、同步方法、添加普通方法
4、两个对象, 两个同步方法
5、两个静态的同步方法
6、两个对象。 两个静态的同步方法
7、一个静态、一个普通、一个对象
8、一个静态、一个普通、两个对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CSVklw1e-1589549823982)(1588320036508.png)]
java里面的集合接口, 以及相应的子接口
1、List 线程不安全
// 测试 list 的线程不安全 public class TestList { // Exception in thread "6" java.util.ConcurrentModificationException public static void main(String[] args) { List<Integer> list; list = new ArrayList<Integer>(); // list = new Vector<>(); /** * 解决方案 * 1、调用集合工具包 Collections.synchronizedList(); * 2、使用 vector * 3、new CopyOnWriteArrayList<Integer>(); */ List<Integer> integerList = Collections.synchronizedList(list); // list = new CopyOnWriteArrayList<Integer>(); for (Integer i = 0; i < 30; i++) { Integer temp = i; new Thread(() -> { list.add(temp); System.out.println(list); }, String.valueOf(i)).start(); } } }
解决方案
1、使用Vector 来替换
Vector 是线程安全的, 它的底层是采用 Synchronized 修饰过的。
public Vector() { this(10); } public Vector(Collection<? extends E> c) { this.elementData = c.toArray(); this.elementCount = this.elementData.length; if (this.elementData.getClass() != Object[].class) { this.elementData = Arrays.copyOf(this.elementData, this.elementCount, Object[].class); } } public synchronized void copyInto(Object[] anArray) { System.arraycopy(this.elementData, 0, anArray, 0, this.elementCount); } public synchronized void trimToSize() { ++this.modCount; int oldCapacity = this.elementData.length; if (this.elementCount < oldCapacity) { this.elementData = Arrays.copyOf(this.elementData, this.elementCount); } }
2、使用集合的工具包下的 调用集合工具包 Collections.synchronizedList();
3、使用 JUC 下的 import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList
能够实现数据同步的原因就是它采用的是写入时复制的实现方式, 每个线程在写入时候都把当前的数据复制一份,然后修改这个备份的数据,最后在把数据替换成为之前复制过的副本数据,修改操作的同时,读操作是不会被阻塞的, 因为此时读的是之前的数据, 而修改的是备份数据。
CopyOnWriteArrayList,是一个写入时复制的容器,它是如何工作的呢?简单来说,就是平时查询的时候,都不需要加锁,随便访问,只有在写入/删除的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。修改操作的同时,读操作不会被阻塞,而是继续读取旧的数据。这点要跟读写锁区分一下。
2、set 集合不安全
线程不安全的原因和list是差不多的。
// Exception in thread "43" java.util.ConcurrentModificationException public class SetTest { public static void main(String[] args) { Set set = new HashSet(); for (int i = 0; i < 50; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set.toString()); }, String.valueOf(i)).start(); } } }
解决方案:
1、使用Collections.synchronizedSet(set);
2、使用new CopyOnWriteArraySet<>();
public class SetTest { public static void main(String[] args) { Set<String> set = new HashSet<String>(); /** * 1、Collections.synchronizedSet(set); * 2、new CopyOnWriteArraySet<>(); */ Set<String> set1 = Collections.synchronizedSet(set); CopyOnWriteArraySet<Object> set2 = new CopyOnWriteArraySet<>(); for (int i = 0; i < 50; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set.toString()); }, String.valueOf(i)).start(); } } }
hashset集合分析: 本质就是 hashmap , 而添加元素的方法也是调用的 haspmap 的put方法,利用 map的键是唯一的来满足set集合的元素不重复, 而map的值, 采用的是一个固定值。
public HashSet() {
this.map = new HashMap();
}
public boolean add(E e) {
return this.map.put(e, PRESENT) == null;
}
CountDownLatch
测试:
public class CountDownLatchDemo { // 测试计数器 public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <=10; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()); countDownLatch.countDown(); },String.valueOf(i)).start(); } // countDownLatch.await(); System.out.println("关门 -> "); } }
打印:
关门 ->
1
10
9
7
6
8
2
4
3
5
CountDownLatch可以当作是一个计数器,核心方法如下:
1、countDownLatch.countDown(); 计数初值减一
2、countDownLatch.await(); 当计数器变为0,countDownLatch.await() 就会被唤醒,继续执行程序!
cyclicBarrier
// 加法计数器 public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("计数完成"); }); for (int i = 1; i <= 7; i++) { int finalI = i; new Thread(() -> { try { System.out.println("第 " + String.valueOf(finalI) + "个"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, Thread.currentThread().getName()).start(); } } }
cyclicBarrier.await() 方法会让 CyclicBarrier 一直等待, 当线程数量达到阈值的时候就会触发。
下面是 CyclicBarrier 的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
this.lock = new ReentrantLock();
this.trip = this.lock.newCondition();
this.generation = new CyclicBarrier.Generation();
if (parties <= 0) {
throw new IllegalArgumentException();
} else {
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
Semaphore
public class SemaphoreDemo { // 控制最大的线程数量 public static void main(String[] args) { // 最大执行3个 线程 Semaphore semaphore = new Semaphore(3); /** * 1、 semaphore.acquire(); 获得, 在满的情况下, 会等待, 等待释放位置 * 2、 semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程! * 作用: 多个共享资源互斥的使用!并发限流,控制大的线程数 */ for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到了车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放 } }, String.valueOf(i)).start(); } } }
public class demo { public static void main(String[] args) { /* // 资源类 Mycache mycache = new Mycache(); // 写入 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { mycache.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } // 读取 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { mycache.get(temp + ""); }, String.valueOf(i)).start(); }*/ // 资源类 MycacheLock mycache = new MycacheLock(); for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { mycache.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { mycache.get(temp + ""); }, String.valueOf(i)).start(); } } } /** * 使用读写锁 */ class MycacheLock { private volatile Map<String, Object> map = new HashMap<>(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读锁, 读取的时候 所有线程都可以参加, 又称共享锁 public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } // 写锁, 写入的时候, 只能有一个线程参加, 排它锁 public void put(String key, String value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } } /** * 不加锁 */ class Mycache { private volatile Map<String, Object> map = new HashMap<>(); public void get(String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取OK"); } public void put(String key, String value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入OK"); } }
线程池的三大方法、七大参数、四种拒绝策略
池话技术
程序运行就是要占用系统的资源、而池技术就相当于提前准备好程序运行所需要的资源, 程序运行的时候直接到池子里面取资源, 用完会就在放回到池中。比如:线程池、连接池、常量池等。
使用池化技术能够节约系统的资源,池化能够省略到系统反复创建资源的过程。而线程池就是为了能够实现线程的服用节约一部分的资源。
三大方法
1、 创建固定大小的线程池**
ExecutorService service = Executors.newFixedThreadPool(3);
2、 创建一个缓存的线程池, 大小可变
ExecutorService service1 = Executors.newCachedThreadPool();
3、创建一个单一的线程池
ExecutorService service2 = Executors.newSingleThreadExecutor();**
public class demo01 { // 创建线程的三个方法 public static void main(String[] args) { // 创建固定大小的线程池 ExecutorService service = Executors.newFixedThreadPool(3); // 创建一个缓存的线程池, 大小可变 ExecutorService service1 = Executors.newCachedThreadPool(); // 创建一个单一的线程池 ExecutorService service2 = Executors.newSingleThreadExecutor(); try { for (int i = 1; i <= 5; i++) { service2.execute(() -> { System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); } finally { service2.shutdown(); } } }
查看源码:三个创建线程池的方法都是使用T hreadPoolExecutor , 方法来创建的。
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } // 创建线程的七个参数 public ThreadPoolExecutor( int corePoolSize, // 核心线程池 int maximumPoolSize, // 最大核心线程池 long keepAliveTime, // 超时后没有人用就会释放 TimeUnit unit, // 超时单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory // 线程工厂, 不需要动 ) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); // defaultHandler 拒接策略 }
手动创建线程池
// 创建线程池 public class demo02 { public static void main(String[] args) { // 创建线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy() ); try { for (int i = 1; i <= 2; i++) { // 线程池中, 使用 execute()来创建线程 executor.execute(() -> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // shutdown ()来关闭线程池 executor.shutdown(); } } }
四种拒绝策略
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FPiqcnbp-1589549823985)(1588340557711.png)]
/** * new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异 常 * new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里! * new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常! * new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和早的竞争,也不会 抛出异常! */
小结
public class demo03 {
/**
* 最大线程应该如何定义:
* 1、CPU密集型:最大线程数为CPU的核数
* 2、IO密集型: > 你的程序中十分消耗IO的线程
*
* @param args
*/
public static void main(String[] args) {
// 获取当前的 cpu 核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口就是只有一个方法的接口, 它有有个专门的注解, 我们平时使用的Runable 接口就是一个函数式的接口。函数式的接口可以使用lambda 表达式来简化编程。
@FunctionalInterface
public interface Runnable {
void run();
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9ShT9nJo-1589549823989)(1588347319985.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kmyNFR89-1589549823992)(1588347364237.png)]
测试 Function
@FunctionalInterface
public interface Function<T, R> {
R apply(T var1);
}
// 传入 T R 返回 R
code
public class function {
public static void main(String[] args) {
// 函数式接口, 可以使用 lambda接口
Function<String, String> function = new Function<String, String>() {
@Override
public String apply(String str) {
return str;
}
};
String s = function.apply("str");
System.out.println(s);
}
}
predicate 断定型接口:参数只有一个, 并且只有一个返回值, 只能是布尔值。
@FunctionalInterface
public interface Predicate<T> {
boolean test(T var1);
}
code
public class predicate {
public static void main(String[] args) {
Predicate<String> predicate = (str) -> {
return str.isEmpty();
};
System.out.println(predicate.test(""));
}
}
Supplier 供给型接口 没有参数、只有返回值
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VfDfn0CM-1589549823994)(1588348632954.png)]
code
public class SupplierTest {
public static void main(String[] args) {
Supplier<String> supplier = () -> {
return "1024";
};
System.out.println(supplier.get());
}
}
Consume 消费型接口
public interface Consumer<T> {
void accept(T var1);
}
// 只有输入, 没有返回值
code
public class Consume {
public static void main(String[] args) {
Consumer<String> c = (str) -> {
System.out.println(str);
};
c.accept("asd");
}
}
Stream 流式计算内部采用了大量的函数式接口, 很多用的就直接是 10 中的四大函数式接口,所有我们在使用的时候可以直接用 lambda表达式。
public class demo01 { public static void main(String[] args) { User user1 = new User(1, "a", 22); User user2 = new User(2, "b", 23); User user3 = new User(3, "c", 24); User user4 = new User(4, "d", 25); List<User> userList = Arrays.asList(user1, user2, user3, user4); // 使用 stream 流来做计算 userList.stream() .filter((u) -> { return u.getId() % 2 == 0; }) .filter((u) -> { return u.getAge() > 20; }) .map((u) -> { return u.getName().toUpperCase(); }) .limit(3) .forEach(System.out::println); } }
分析
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UO9LmirI-1589549823996)(1588377462537.png)]
forkjoin 是一个能够并行执行任务的类, 它把任务进行了拆分, 然后在分开执行, 最后合并结果。其本质式维护了一个双端队列, 一个执行完任务的队列,去窃取了另外一个队列的任务。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fBB8gl5R-1589549823997)(1588379034387.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EaKNAvXi-1589549823998)(1588379652703.png)]
public abstract class RecursiveTask<V> extends ForkJoinTask<V> 一个递归结果轴承 ForkJoinTask。 一个典型的例子,这是一个任务计算斐波那契数列: class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } Integer compute() { if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } }
code
public class forkjoin extends RecursiveTask<Integer> { final int n; public forkjoin(int n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) return n; forkjoin f1 = new forkjoin(n - 1); f1.fork(); forkjoin f2 = new forkjoin(n - 1); f2.fork(); return f1.join() + f2.join(); } }
public class TestForkJoin {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务
ForkJoinTask<Integer> forkjoin = new forkjoin(3);
// 创建执行任务的 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// 提交任务
ForkJoinTask<Integer> submit = pool.submit(forkjoin);
// 得到结果
Integer integer = submit.get();
System.out.println(integer);
}
}
JMM是java内存模型的意思,它用来屏蔽硬件和操作系统之间的内存访问差异而设计的概念。主要功能就是为了定程序中各种变量的访问规则,即关注的点是:java从内存中读取变量、和写入内存中变量,上述所说的变量是线程共享的变量。
JMM中规定所有的变量都存储在主内存中, 线程的工作内存存放的是被该线程所使用的变量的主内存副本,线程对变量的所有操作, 都必须在工作内存中进行, 而不能直接读写主内存中的数据, 不同的线程之间也无法访问对方的工作内存中的变量,线程之间的变量值(数据)交互需要使用主内存来完成。
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类 型的变量来说,load、store、read和write操作在某些平台上允许例外)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量 才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便 随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机 遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变 量副本中 store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中, 以便后续的write使用
write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内 存的变量中
JMM对这八种指令的使用,制定了如下规则:
不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须 write
不允许线程丢弃他近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量 实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解 锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前, 必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
1、保证可见性
// 测试可见性 public class demo01 { // 定义一个变量值, 不使用 volatie 修饰该变量的话, 程序会进入四循环, 当前线程无法得到 变量 flag 被修改之后的值 // volatie 能够保证可见性 static volatile Integer flag = 0; public static void main(String[] args) { new Thread(() -> { while (flag == 0) { } }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 在主线程中修改它, 判断在其他线程能否检测到它的变化。 flag = 1; System.out.println("flag: " + flag); } }
2、不保证原子性
原子性:就和数据库中的事务一样,要么全部执行、要么全部不执行。是一个不可分割的操作,线程在执行任务的时候不可被打断。
失败
// 测试 volatile 的不保证原子性 public class demo { private volatile static int num = 10; public static void increase() { num++; } public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { increase(); } }, String.valueOf(i)).start(); } // 最后线程的存活数量大于 2 的话就证明 上面的 20 条线程还没执行完毕 while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + num); } }
使用原子类
public class demo03 { private volatile static AtomicInteger num = new AtomicInteger(); // cas 的原子性操作 public static void increase() { num.incrementAndGet(); } public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { increase(); } }, String.valueOf(i)).start(); } // 最后线程的存活数量大于 2 的话就证明 上面的 20 条线程还没执行完毕 while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + num); } }
3、禁止指令重排
计算机有时候不会按照我们编写的代码顺序去执行我们的代码, 而是会先自己优化一下, 我们都知道计算机执行的语句都是一条一条的机器码, java 代码首先会编译成为字节码文件, 然后在由虚拟机解释成机器码交由操作系统来执行。
int a = 1
int b = 2
a = a + 3
b = a * a
可能的执行顺序:
1 2 3 4
1 2 3 4
2 1 3 4
虽然会指令重排, 但计算机会保证数据之间的依赖性
如果有多个线程来执行上面的代码, 在指令重排的时候, 可能会出现错误!!!
避免指令重排, 使用 volatile 关键字修饰
原理: volatile 关键字在被修饰的变量前后添加了一个内存屏障, 导致计算机无法对我们的代码进行指令重排。
小结:可以保证可见性、不保证原子性、可以避免指令重排。
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就 一直循环!
缺点:
1、 循环会耗时
2、一次性只能保证一个共享变量的原子性 3、ABA问题
public class demo01 {
// cas compareandset : 比较并交换
public static void main(String[] args) {
// 得到自己所期望的就会立马更新, 否则就不会更新, 操作系统的原语
AtomicInteger atomicInteger = new AtomicInteger(2020);
atomicInteger.compareAndSet(2020,2021);
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement();
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
aba问题
狸猫换太子的问题: 在当前线程执行任务中, 另外一个线程出来捣乱, 修该了变量, 但是在线程结束的时候又把变量修改过来了。之前的线程并不知道 自己所依赖的变量被修该!!!
public class demo02 {
// 狸猫换太子的问题: 在当前线程执行任务中, 另外一个线程出来捣乱, 修该了变量, 但是在线程结束的时候又把变量修改过来了。
// 之前的线程并不知道 自己所依赖的变量被修该!!!
public static void main(String[] args) {
AtomicInteger integer = new AtomicInteger(2020);
// 捣乱的线程
integer.compareAndSet(2020, 2021);
System.out.println(integer.get());
integer.compareAndSet(2021, 2020);
System.out.println(integer.get());
integer.compareAndSet(2020, 666);
System.out.println(integer.get());
}
}
解决上述的ABA问题, 思想源于 : 乐观锁, 添加版本号
可重入锁
// 可重入锁 public class demo01 { public static void main(String[] args) { people people = new people(); // 开启线程 new Thread(() -> { people.say(); }, "A").start(); // 开启线程 new Thread(() -> { people.say(); }, "B").start(); } } class people { public synchronized void say() { System.out.println(Thread.currentThread().getName() + "say"); walk(); // 在拿到 say 调用者锁后, 又重新获得了一次walk调用者的锁 // 可以多次获得一把锁, 而不会出现死锁的问题 } public synchronized void walk() { System.out.println(Thread.currentThread().getName() + "walk"); } }
public class demo02 { public static void main(String[] args) { people2 people = new people2(); // 开启线程 new Thread(() -> { people.say(); }, "A").start(); // 开启线程 new Thread(() -> { people.say(); }, "B").start(); } } class people2 { Lock lock = new ReentrantLock(); public void say() { lock.lock(); lock.lock(); try { System.out.println(Thread.currentThread().getName() + "say"); walk(); // 在拿到 say 调用者锁后, 又重新获得了一次walk调用者的锁 // 可以多次获得一把锁, 而不会出现死锁的问题 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); } } public void walk() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "walk"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
中断锁
Synchronized是不可中断锁,LOCK为可中断锁
如果线程A正在执行锁中的代码,线程B在等待获取该对象锁,如果等待时间过长,想让B去做其他事情,可以让他自己中断或者在别的线程中断它
读写锁
读锁:是线程公有的
写锁:线程私有的
公平锁、非公平锁
公平锁:十分公平、不可以插队
ople.say();
}, “B”).start();
}
}
class people {
public synchronized void say() {
System.out.println(Thread.currentThread().getName() + “say”);
walk(); // 在拿到 say 调用者锁后, 又重新获得了一次walk调用者的锁
// 可以多次获得一把锁, 而不会出现死锁的问题
}
public synchronized void walk() {
System.out.println(Thread.currentThread().getName() + “walk”);
}
}
```java public class demo02 { public static void main(String[] args) { people2 people = new people2(); // 开启线程 new Thread(() -> { people.say(); }, "A").start(); // 开启线程 new Thread(() -> { people.say(); }, "B").start(); } } class people2 { Lock lock = new ReentrantLock(); public void say() { lock.lock(); lock.lock(); try { System.out.println(Thread.currentThread().getName() + "say"); walk(); // 在拿到 say 调用者锁后, 又重新获得了一次walk调用者的锁 // 可以多次获得一把锁, 而不会出现死锁的问题 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); } } public void walk() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "walk"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
中断锁
Synchronized是不可中断锁,LOCK为可中断锁
如果线程A正在执行锁中的代码,线程B在等待获取该对象锁,如果等待时间过长,想让B去做其他事情,可以让他自己中断或者在别的线程中断它
读写锁
读锁:是线程公有的
写锁:线程私有的
公平锁、非公平锁
公平锁:十分公平、不可以插队
非公平锁:十分不公平、可以插队
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。