赞
踩
synchronized关键字解决的是多线程之间访问资源的同步性,synchronized关键字可以保证被它修饰的方法或者代码块在任意时刻只能被一个线程执行。
监视器锁(monitor)是依赖于底层的操作系统的 Mutex Lock来实现的,java的线程是映射到操作系统原生线程之上。如果要挂起一个线程,或者唤醒一个线程,都需要os来帮忙,而os实现线程切换需要从用户态转换为内核态。需要时间长,成本高。
java6之后,synchronized有了较大优化,如自旋锁,适应性自旋锁,锁消除,锁粗化,偏向锁,轻量级锁。
优化来减少锁操作的开销。
synchronized void method(){
//业务代码
}
synchronized static void method(){
//业务代码
}
synchronized(this){
//业务代码
}
synchronized(Hello.class){
//业务代码
}
synchronized(obj){
//业务代码
}
public Singleton{ private volatile static Singleton uniqueInstance; private Singleton(){ } public static Singleton getUniqueInstance(){ if(uniqueInstance == null){ synchronized(Singleton.class){ if(uniqueInstance == null){ uniqueInstance = new Singleton(); } } } return uniqueInstance; } } //uniqueInstance = new Singleton();分三步执行 //1.分配内存空间 //2.初始化内存空间 //3.将uniqueInstance指向分配的内存地址 //由于jvm可能会将指令重排,导致顺序可能变为132,已经指向了地址,但是还没初始化完毕。 //其他线程坑就直接拿去用了,导致错误。 //使用volatile可以禁止指令重排序。
monitorenter
和monitorexit
指令包裹起来。执行monitorenter
时,线程试图获取对象监视器monitor
的持有权。获取之后,锁计数器+1
执行monitorexit
时,锁计数器为0,表明锁被释放。
synchronized
修饰方法,方法的标识会有ACC_SYNCHRONIZED
本质都是对对象监视器monitor
的获取
偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术
锁主要存在四种状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,他们会随着竞争的激烈而逐渐升级。注意锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率。
先是无锁状态,有一个线程需要使用锁资源就用偏向锁,具体是在MarkWord里设置了本线程的线程id,以后该线程
进入和推出都不用加锁和解锁。只需要简单测试下锁资源的对象头的MarkWord里面有没有自己的id就行了。
如果有线程争用资源,就进化为轻量级锁,markword里面的数据标识为00(轻量级锁标识),此时只能有一个线程获得锁,其他线程自旋,自旋一定时间还没有获得锁,那锁资源的对象头的Markword就会被修改为重量级锁,标识为10,然后没获得线程的就挂起,剩下的新来的线程一看到是重量级锁,也直接挂起。
偏向锁适用于只有一个线程访问的同步场景,就是这个场景需要用同步保证安全,但是一般并发数不高。
轻量级锁是竞争不激烈的时候,挂起与唤醒都太消耗资源,所以就用自旋的方式解决。
如果自旋一直拿不到,那就说明竞争还是激烈,就不自旋浪费cpu了,直接挂起,改为重量级锁。
1.都是可重入锁,自己可以再次获取自己的内部锁,同一个线程每次获得锁,锁计数器都+1。到0才算释放。
2.synchronized依赖于JVM,ReentrantLock依赖于API,需要lock()和unlock()配合try/finally语句块来实现。
3.ReentrantLock比synchronized增加的功能:
线程可以把变量保存在本地内存,比如机器的寄存器。而不是直接在在主存中进行读写。这造成一个线程在主存中修改了一个变量的值,另一个线程还在用寄存器中的变量值拷贝。造成数据不一致。
volatile
关键字指示JVM这个变量是共享且不稳定的,每次都要从主存中读取。
所以volatile
除了防止指令重排
,还能保证变量的可见性
。
并发编程的三个重要特性
实现每一个线程都有自己的专属本地变量。
如果创建了一个ThreadLocal
变量,那么访问这个变量的每个线程都会有这个变量的本地副本。
public class Thread implements Runnable{ //与此线程有关的ThreadLocal值,由ThreadLocal类维护 ThreadLocal.ThreadLocalMap threadLocals = null; } //ThreadLocal 的set方法 public void set(T value){ Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if(map!=null) map.set(this,value); else createMap(t,value); } ThreadLocalMap getMap(Thread t) return t.threadLocals; //变量的实际存储位置就是ThreadLocalMap,ThreadLocal为key,通过访问ThreadLocal变量这个key,去当前线程找到了threadLocalMap里面的Entry,用key取出值。所以是线程隔离的。
ThreadLocal
在Java中是一个比较特殊的类,它提供了一种线程局部变量的概念,即每个使用该变量的线程都有这个变量的一个独立副本,互不干扰。在并发编程中,ThreadLocal
是解决线程安全问题的一个常用工具。
然而,ThreadLocal
的使用如果不当,确实可能会导致内存泄漏问题。内存泄漏通常发生在使用ThreadLocal
的线程结束后,ThreadLocal
中保存的对象没有被回收。
内存泄漏通常是指应用程序不再需要的内存,由于某种原因没有被操作系统或可用内存回收机制回收,导致该部分内存始终得不到释放。在ThreadLocal
的情况下,内存泄漏的原因通常与其内部使用的ThreadLocalMap
有关,这是一个专门为每个线程维护的ThreadLocal
变量的复制的数据结构。
ThreadLocalMap的键是对ThreadLocal对象的弱引用
ThreadLocalMap
使用ThreadLocal
实例作为键,而这些键是弱引用。如果没有外部强引用指向ThreadLocal
实例,那么在垃圾回收时,ThreadLocal
实例可能会被回收。
ThreadLocalMap的值对Entry对象的引用是强引用
即使ThreadLocal
实例被回收,但是ThreadLocalMap
中的值(也就是用户保存的对象)仍然持有强引用,这意味着即使ThreadLocal
的键被清理了,值仍旧不会被垃圾回收器回收。
线程生命周期
如果是线程池中的线程,那么线程可能会长时间存在,即便执行的任务已经结束。这就意味着,ThreadLocalMap
中的值可能会长时间保留,直到线程结束。
要防止因为ThreadLocal
使用不当而导致的内存泄漏,可以采取以下措施:
及时清除
在不再需要访问ThreadLocal
存储的数据时,应该显式调用ThreadLocal
的remove()
方法来清除ThreadLocalMap
中的相关条目。
threadLocal.remove();
尽量不在ThreadLocal
中存放大的对象或者容易导致内存泄漏的对象
如果确实需要存放大的对象或者容易导致内存泄漏的对象,要确保在不使用时能够及时地调用remove()
。
使用完毕后结束线程
对于不是池化的线程,确保其执行完毕后能够及时结束。对于线程池中的线程,确保在关闭线程池时清理线程本地变量。
小心内部类的引用
如果你在一个内部类中使用了ThreadLocal
,那么要注意是否有可能无意中持有了对外部类的引用。
通常,只要在使用ThreadLocal
时遵循良好的编程习惯,就能够有效避免内存泄漏问题。在需要长期运行的线程,尤其是线程池场景中,务必注意对ThreadLocal
变量的正确管理。
在微服务架构中,前端访问服务A,A访问B,然后为了在日志系统里清楚的记录这一个链路过程。需要在调用过程中记录一个traceId,可以使用threadLocal的方式。
前端请求服务A,A生成一个uuid的traceId,放到当前线程的Threadlocal中去,调用服务B的时候,写入header,B获得请求,分析出header是否有traceId,如果存在则写入自己线程的threadLocal中去。
使用线程池的好处:
Runnable
接口不会返回结果或者抛出检查异常。但是Callable
可以。
如果接口不需要返回结果或者抛出异常,尽量使用Runnable
。
工具类Executors
可以实现二者相互转换。
Executors.callable(Runnable task)
Executors.callable(Runnable task,Object resule)
Runnable.java
public interface Runnable{
public void run();
}
Callable.java
public interface Callable<V>{
V call() throws Exception();
}
public class MyCallable implements Callable<Integer> { @Override public Integer call() throws Exception{ int sum = 0; for( int i =0;i<100;i++){ Thread.sleep(200); sum += i; } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable callable = new MyCallable(); FutureTask<Integer> task = new FutureTask<>(callable); Thread thread1 = new Thread(task); thread1.start(); //做自己的事 //... //过一段时间之后,回头来取计算出来的值 Thread.sleep(5000); System.out.println(System.currentTimeMillis());//1616469857144 System.out.println("我等了五秒钟了,你计算出来了吗"); if(task.get()!=null){//还没计算出来就会阻塞在这里 System.out.println(System.currentTimeMillis());1616469872192大约差了12秒 System.out.println("计算出了"); System.out.println(task.get()); } System.out.println("end"); } }
Executors.newSingleThreadExecutor();//单线程化的线程池。比如主线程需要调用一个存储过程,时间很长,但是不需要知道其结果,就再开一个线程,主线程直接返回。但是如果每次都开一个,多个线程容易造成死锁,就开单线程化的线程池。
Executors.newFixedThreadPool(n);//固定大小的线程池
//以上两个都需要队列,线程数为(1,1)或者(n,n),但是允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求导致OOM。
Executors.newCachedThreadPool();//几乎不限制大小的线程池
Executors.newScheduledThreadPool();//执行周期性任务的线程池
//这两个允许创建的线程数量为Integer.MAX_VALUE(0,Integer.MAX_VALUE),可能会创建大量线程,导致OOM
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadfactory, RejectedExecutionHandler handler){ //... } corePoolSize:核心线程数,定义了最小可以同时运行的线程数量。 maximumPoolSize:最大线程数。 workQueue:新任务来了,先看看核心线程数用完了没,用完了就放在队列中,队列满了就开线程。 keepAliveTime:线程数量超过corePoolSize的时候,如果此时没有新任务要做了。corePoolSize之外的线程不会立刻销毁,知道等待时间超过了keepAliveTime,再销毁。 unit:时间单位。 threadFactory:executor创建新线程会用到。 handler:饱和策略。
阻塞队列
饱和策略
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了。
策略可以是拒绝新任务;
可以不处理直接丢弃。
也可以丢弃最早的未处理请求。
也可以让本线程执行run任务。
线程池的执行方法
java.util.concurrent
包
写操作时,通过加分段锁,只对这一段有影响。1.8之后改为了node数组+链表+红黑树,没有了segment结构。
主要利用了synchronized(只锁数组的一个位置而非锁一段)+CAS操作完成。
读取完全不加锁,写入不阻塞读取操作,写入和写入之间需要同步等待。
public class CopyOnWriteArrayList<E> extends Object
implements List<E>, RandomAccess, Cloneable, Serializable{}
具体实现:
add,set操作都是创建底层数组的副本来实现的,创建完成之后,指向新数组,旧数组回收。这样读取的时候就不阻塞。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}
主要采用CAS非阻塞算法实现线程安全。
BlockingQueue
接口提供了可阻塞的插入和移除的方法。
队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
ArrayBlockQueue
是BlockQueue的有界队列实现类,底层用数组实现。
一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
LinkedBlockingQueue
底层采用单向链表实现,可以无界可以有界(指定大小)。
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认自然排序,也可以采用compareTo修改排序规则。
是PriorityQueue的线程安全版本。
使用了跳表。原始链表有序,用空间换时间,构造了很多层的子集链表。查找时不用遍历整个表,类似抽象的二叉树的结构。
AtomicInteger
,AtomicLong
,AtomicBoolean
AtomicIntegerArray
,AtomicLongArray
,AtomicBooleanArray
AtomicInteger
常用方法:
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
AtomicReference
:引用类型原子类
AtomicStampedReference
:原子更新带有版本号的引用类型。可解决ABA问题
使用示例:
class Person { private String name; private int age; public Person(String name, int age) { super(); this.name = name; this.age = age; } //省略get,set } public class AtomicReferenceTest{ public static void main(String[] args){ AtomicReference<Person> ar = new AtomicReference<Person>(); Person p = new Person("hello",23); ar.set(person); Person updatePerson = new Person("world", 20); ar.compareAndSet(person,updatePerson); System.out.println(ar.get().getName());//world System.out.println(ar.get().getAge());20 } }
如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类
AtomicIntegerFieldUpdater
:原子更新整形字段的更新器
AtomicLongFieldUpdater
:原子更新长整型字段的更新器
AtomicReferenceFieldUpdater
:原子更新引用类型的字段的更新器
//user有一个字段,age
public class AtomicIntegerFieldUpdaterTest {
public static void main(String[] args) {
AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("Java", 22);
System.out.println(a.getAndIncrement(user));// 22
System.out.println(a.get(user));// 23
}
}
全称:AbstractQueuedSynchronizer,抽象队列同步器。
是一个用来构建锁和同步器的框架,使用AQS能够简单高效的构造出广泛的大量的同步器,比如ReentrantLock
,Semaphone
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
都基于AQS。
核心思想:当前线程访问资源,如果被请求的共享资源空闲,当前线程就是有效的工作线程。该资源被锁定。
其他线程来访问,就需要一套阻塞和唤醒时分配锁的机制。这个机制就是创建一个队列,将暂时获取不到锁的线程加入到队列中,但是它为啥叫抽象队列,因为并没有这么一个队列的实体,而是将每个阻塞线程封装成锁队列的一个节点。维护他们的前后关系。
独占式
只有一个线程可以执行,如ReentrantLock
share共享
Semaphore/CountDownLatch
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldexclusively()
tryAcquire(int) //独占
tryRelease(int) //独占
tryAccquireShared(int)//共享
tryReleaseShared(int) //共享
public class SemaphoreExample1 { // 请求的数量 private static final int threadCount = 550; public static void main(String[] args) throws InterruptedException { // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢) ExecutorService threadPool = Executors.newFixedThreadPool(300); // 一次只能允许执行的线程数量。 final Semaphore semaphore = new Semaphore(20); for (int i = 0; i < threadCount; i++) { final int threadnum = i; threadPool.execute(() -> {// Lambda 表达式的运用 try { semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20 test(threadnum); semaphore.release();// 释放一个许可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操作 System.out.println("threadnum:" + threadnum); Thread.sleep(1000);// 模拟请求的耗时操作 } }
允许count个线程阻塞在同一个地方,直到所有线程的任务都执行完毕。
CountDownLatch
是共享锁的一种实现,它默认构造AQS
的 state
值为 count
。当线程使用 countDown()
方法时,其实使用了tryReleaseShared
方法以 CAS 的操作来减少 state,
直至state
为 0 。当调用 await()
方法的时候,如果 state
不为 0,那就证明任务还没有执行完毕,await()
方法就会一直阻塞,也就是说 await()
方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0
,如果 state == 0
的话,就会释放所有等待的线程,await()
方法之后的语句得到执行。
一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
public class CountDownLatchDemo { private static final int threadCount = 500; public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(300, 300,1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));//300个线程,队列200个容量,超过就拒绝 final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for(int i = 0;i<threadCount;i++){ final int threadnum = i; executor.execute(()->{ try { test(threadnum); }catch (InterruptedException e){ e.printStackTrace(); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(); executor.shutdown(); System.out.println("finish"); } public static void test(int threadnum) throws InterruptedException { Thread.sleep(1000);// 模拟请求的耗时操作 System.out.println("threadnum:" + threadnum); Thread.sleep(10000);// 模拟请求的耗时操作 } } //实验现象:300个线程执行了,打印了,结束了。等待了大概十秒钟 //接着后200线程才执行打印。然后才能countDownLatch.await();发现CountDownLatch的state为0了。 //才能打印finish
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。听起来和CountDownLatch差不多。
应用场景:开多个线程计算多个数据,都算好之后再汇总数据。
package com.jichu; import java.util.concurrent.*; public class CyclicBarrierDemo { private static final int threadCount = 20; private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException{ ThreadPoolExecutor executor = new ThreadPoolExecutor(300, 300,1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200)); for(int i = 0;i<threadCount;i++){ final int threadNum = i; Thread.sleep(1000); executor.execute(()->{ System.out.println("threadnum:" + threadNum + "is ready"); try { cyclicBarrier.await(60,TimeUnit.SECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadNum + "is finish"); }); } executor.shutdown(); } }
CountDownLatch 是计数器,只能用一次。CyclicBarrier可以循环使用。
而且二者在使用上就有区别,前者一般用于,一个线程等待其他线程都到达某一个点,自己才能继续进行。
而后者是多个线程都到达某个地点,才能继续执行自己的内容。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。