赞
踩
class Phone {
public static synchronized void sendSMS() throws Exception {
//停留4秒
TimeUnit.SECONDS.sleep(4);
System.out.println("------sendSMS");
}
public synchronized void sendEmail() throws Exception {
System.out.println("------sendEmail");
}
public void getHello() {
System.out.println("------getHello");
}
}
1 标准访问,先打印短信还是邮件 ------sendSMS ------sendEmail 2 停4秒在短信方法内,先打印短信还是邮件 ------sendSMS ------sendEmail 3 新增普通的hello方法,是先打短信还是hello ------getHello ------sendSMS 4 现在有两部手机,先打印短信还是邮件 ------sendEmail ------sendSMS 5 两个静态同步方法,1部手机,先打印短信还是邮件 ------sendSMS ------sendEmail 6 两个静态同步方法,2部手机,先打印短信还是邮件 ------sendSMS ------sendEmail 7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件 ------sendEmail ------sendSMS 8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件 ------sendEmail ------sendSMS
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象
ReentrantLock构造方法根据传入参数创建公平锁或非公平锁,默认为非公平锁。非公平锁可能会有线程饿死,效率高;公平锁阳光普照,效率相对低。
NonfairSync源码:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
FairSync源码:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
synchronized和lock都是可重入锁
查看进程jps、jstack
deadlock测试代码:
public static void main(String[] args) { Ticket t1 = new Ticket(); Ticket t2 = new Ticket(); new Thread(() -> { synchronized (t1){ System.out.println("拿到锁t1,准备获取锁t2"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (t2){ System.out.println("获取锁t2"); } } }, "aa").start(); new Thread(() -> { synchronized (t2){ System.out.println("拿到锁t2,准备获取锁t1"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (t1){ System.out.println("获取锁t1"); } } }, "bb").start(); }
使用callable接口创建线程
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
System.out.println(Thread.currentThread().getName() + "==callable线程执行。。。");
return 1111;
});
new Thread(futureTask, "AA").start();
System.out.println(futureTask.get());
}
CountDownLatch类可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。
测试代码:
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "同学离开了教室。");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门了。");
}
CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier一次障碍数会加一,如果达到了目标障碍数,才会执行cyclicBarrier.await()之后的语句。可以将CyclicBarrier理解为加1操作
测试代码:
private final static int num = 7; public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("七颗龙珠已收集,可以召唤神龙"); }); for (int i = 1; i <= num; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "颗龙珠已收集"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } }
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到了停车位"); TimeUnit.SECONDS.sleep(new Random().nextInt(6)); System.out.println(Thread.currentThread().getName() + "离开了停车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, String.valueOf(i)).start(); } }
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。
针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁。
资源类
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author lyz * @Title: CustomCache * @Description: * @date 2021/10/8 15:03 */ public class CustomCache { private volatile Map<String, String> cacheMap = new HashMap<>(); private ReadWriteLock rwLock = new ReentrantReadWriteLock(); public void put(String key, String value){ rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "正在写入数据。。" + key); TimeUnit.SECONDS.sleep(1); cacheMap.put(key, value); System.out.println(Thread.currentThread().getName() + "写完了" + key); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwLock.writeLock().unlock(); } } public String get(String key){ rwLock.readLock().lock(); String value = null; try { System.out.println(Thread.currentThread().getName() + "正在读数据。。" + key); TimeUnit.SECONDS.sleep(1); value = cacheMap.get(key); System.out.println(Thread.currentThread().getName() + "读完了key=" + key + ",value=" + value); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwLock.readLock().unlock(); } return value; } }
测试代码
public static void main(String[] args) { CustomCache cache = new CustomCache(); for (int i = 0; i < 5; i++) { final int num = i; new Thread(() -> { cache.put(String.valueOf(num), String.valueOf(num)); }, String.valueOf(i)).start(); } for (int i = 0; i < 5; i++) { final int num = i; new Thread(() -> { cache.get(String.valueOf(num)); }, String.valueOf(i)).start(); } }
public static void main(String[] args) {
ReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.writeLock().lock();
System.out.println("写入数据");
rwLock.readLock().lock();
System.out.println("读取数据");
rwLock.readLock().unlock();
rwLock.writeLock().unlock();
}
常用的队列主要有以下两种:
• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
public static void main(String[] args) throws Exception { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //第一组 抛出异常 System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); // System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); System.out.println("======================================================"); //第二组 特殊值 System.out.println(blockingQueue.offer("1")); System.out.println(blockingQueue.offer("2")); System.out.println(blockingQueue.offer("3")); System.out.println(blockingQueue.offer("4")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println("======================================================"); //第三组 阻塞 blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); // blockingQueue.put("d"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); System.out.println("======================================================"); //第四组 超时 System.out.println(blockingQueue.offer("a", 3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b", 3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c", 3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d", 3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(3l, TimeUnit.SECONDS)); }
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
特点:
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
• Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
public static void main(String[] args) { //一池多线程 // ExecutorService threadPool = Executors.newFixedThreadPool(5); //一池一线程 // ExecutorService threadPool = Executors.newSingleThreadExecutor(); //可扩展线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0; i < 10; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "办理业务。"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } }
看源码可知底层使用的都是ThreadPoolExecutor
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
• corePoolSize线程池的核心线程数
• maximumPoolSize能容纳的最大线程数
• keepAliveTime空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略
1. 在创建了线程池后,线程池中的线程数为零
2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
实际开发中创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建
测试代码
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2l, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务。");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
Fork/Join它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情: Fork:把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并
定义分支合并类
import java.util.concurrent.RecursiveTask; /** * @author lyz * @Title: MyTask * @Description: * @date 2021/10/13 17:18 */ public class MyTask extends RecursiveTask<Integer> { private static final int value = 10; private int max; private int min; private int result; public MyTask(int max, int min) { this.max = max; this.min = min; } @Override protected Integer compute() { if (max - min < value){ for (int i = min; i <= max; i++) { result += i; } } else { int mid = (max + min) / 2; MyTask task1 = new MyTask(mid, min); MyTask task2 = new MyTask(max, mid + 1); task1.fork(); task2.fork(); result = task1.join() + task2.join(); } return result; } }
调用分支合并代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(100, 1);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
Integer result = submit.get();
System.out.println(result);
forkJoinPool.shutdown();
}
初始化线程的4种方式:
方式1和方式2:主进程无法获取线程的运算结果。不适合当前场景
方式3:主进程可以获取线程的运算结果,并设置给itemVO,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式4:通过如下两种方式初始化线程池:
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory, handler);
通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future
接口,提供了addListener
等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。
作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get
方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
CompletableFuture 提供了四个静态方法来创建一个异步操作。
static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
代码示例:
public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() { @Override public Object get() { System.out.println(Thread.currentThread().getName() + "\t completableFuture"); int i = 10 / 0; return 1024; } }).whenComplete(new BiConsumer<Object, Throwable>() { @Override public void accept(Object o, Throwable throwable) { System.out.println("-------o=" + o.toString()); System.out.println("-------throwable=" + throwable); } }).exceptionally(new Function<Throwable, Object>() { @Override public Object apply(Throwable throwable) { System.out.println("throwable=" + throwable); return 6666; } }); System.out.println(future.get()); } }
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作
带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
代码演示:
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println(Thread.currentThread().getName() + "\t completableFuture"); //int i = 10 / 0; return 1024; } }).thenApply(new Function<Integer, Integer>() { @Override public Integer apply(Integer o) { System.out.println("thenApply方法,上次返回结果:" + o); return o * 2; } }).whenComplete(new BiConsumer<Integer, Throwable>() { @Override public void accept(Integer o, Throwable throwable) { System.out.println("-------o=" + o); System.out.println("-------throwable=" + throwable); } }).exceptionally(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) { System.out.println("throwable=" + throwable); return 6666; } }); System.out.println(future.get()); }
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
allOf:等待所有任务完成
anyOf:只要有一个任务完成
public static void main(String[] args) { List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"), CompletableFuture.completedFuture(" world!"), CompletableFuture.completedFuture(" hello"), CompletableFuture.completedFuture("java!")); final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); allCompleted.thenRun(() -> { futures.stream().forEach(future -> { try { System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }); }
测试结果:
get future at:1568892339473, result:hello
get future at:1568892339473, result: world!
get future at:1568892339473, result: hello
get future at:1568892339473, result:java!
几乎同时完成任务!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。