当前位置:   article > 正文

JUC知识点_总线嗅探机制

总线嗅探机制

一.线程中断标记

1.interrupt

实例方法interrupt()仅仅是设置线程的中断状态为true,不会停止线程

1.如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true。
被设置中断标志的线程将继续正常运行,不受影响。所以,interrupt()并不能真正的中断线程,需要被调用的线程自己进行配合才行。

2.如果线程处于被阻塞状态(例如处于sleep, wait,join等状态),在别的线程中调用当前线程对象的interrupt方法,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。并且会将中断状态置为false,需要重新将标志为置为true

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
public static void m5()
    {
        Thread t1 = new Thread(() -> {
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("-----isInterrupted() = true,程序结束。");
                    break;
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();//???????  //线程的中断标志位为false,无法停下,需要再次掉interrupt()设置true
                    e.printStackTrace();
                }
                System.out.println("------hello Interrupt");
            }
        }, "t1");
        t1.start();

        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            t1.interrupt();//修改t1线程的中断标志位为true
        },"t2").start();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.interrupted

判断线程是否被中断,并清除当前中断状态这个方法做了两件事:
1返回当前线程的中断状态
2将当前线程的中断状态设为false

  • 1
  • 2
  • 3
  • 4

3.isInterrupted

判断当前线程是否被中断(通过检查中断标志位)
  • 1

二.LockSupport

1.1 LockSupport.unpark();

会给指定线程发通行证,一个线程只能发一次,如果要发多个需要开多个线程去unpark();
  • 1

1.2LockSupport.park();

线程加锁,可以锁多次,锁几次就得解几次,但是不能提前解多次,只能解一次
  • 1

提前发了两次通行证,但只有一个生效,线程无法终止

public static void main(String[] args) {

        Thread thread = new Thread(() -> {

            try {
                Thread.sleep(2000);
                System.out.println("t1开始运行---");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LockSupport.park();
            LockSupport.park();
        });
        thread.start();
        new Thread(() ->{
            System.out.println("开始解锁11111");
            LockSupport.unpark(thread);
        }).start();

        new Thread(() ->{
            System.out.println("开始解锁22222");
            LockSupport.unpark(thread);
        }).start();

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

先上锁多次再多次解锁可以成功,但是如果出现同时解锁会导致线程一直运行

public static void main(String[] args) {

        Thread thread = new Thread(() -> {

            try {
                //Thread.sleep(2000);
                System.out.println("t1开始运行---");
            } catch (Exception e) {
                e.printStackTrace();
            }
            LockSupport.park();
            LockSupport.park();
        });
        thread.start();
        new Thread(() ->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开始解锁11111");
            LockSupport.unpark(thread);
        }).start();

        new Thread(() ->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开始解锁22222");
            LockSupport.unpark(thread);
        }).start();

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

三、JMM

在这里插入图片描述

3.1、JMM定义


Java虚拟机规范中试图定义一种Java内存模型《java Memory Model,简称JMM)来屏蔽掉各种硬件和操作系统的内存访问差异以实现让Java程序在各种平台下都能达到一致的内存访问效果。

JMM(Java内存模型Java Memory Model,简称JMM)本身是一种抽象的概念1不真实存在它仅仅描述的是一组约定或规范,通过这组规范定义了程序中(尤其是多线程)各个变量的读写访问方式并决定一个线程对共享变量的写入何时以及如何变成对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。

作用:
1通过JMM来实现线程和主内存之间的抽象关系。
2屏蔽各个硬件平台和操作系统的内存访问差异以实现让Java程序在各种平台下都能达到一致的内存访问效果。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.2.JMM三大特性

2.1 可见性

是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更,JMM规定了所有的变量都存储在主内存中。
  • 1

在这里插入图片描述
2.2有序性

对于一个线程的执行代码而言,我们总是习惯性认为代码的执行总是从上到下,有序执行。但为了提供性能,编译器和处理器通常会对指令序列进行重新排序。
指令重排可以保证串行语义一致,但没有义务保证多线程间的语义也一致,即可能产生"脏读",简单说,
两行以上还相干的代码在执行的时候有可能先执行的不是第一条,不见得是从上到下顺序执行,执行顺序会被优化。


单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。处理器在进行重排序时必须要考虑指令之间的数据依赖性
多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述

2.3

指一个操作是不可中断的,即多线程环境下,操作不能被其他线程干扰
  • 1

3.3.JVM读取过程

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),
工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,
所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,
首先要将变量从主内存拷贝到的线程自己的工作内存空间,然后对变量进行操作,
操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,
因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

线程和主内存之间的关系

1线程之间的共享变量存储在主内存中(从硬件角度来说就是内存条)
2每个线程都有一个私有的本地工作内存,本地工作内存中存储了该线程用来读I写共享变量的副本(从硬件角度来说就是CPU的缓存,比如寄存器、L1、L2、L3缓存等)

  • 1
  • 2
  • 3

在这里插入图片描述

3.4、happens-before

1) 站在Java程序员的角度来说:JMM保证,如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
2) 站在编译器和处理器的角度来说:JMM允许,两个操作之间存在happens-before关系,不要求Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么这种重排序是允许的。
  • 1
  • 2

3.4.1 次序规则

一个线程内,按照代码顺序,写在前面的操作先行发生于写在后面的操作;
  • 1

3.4.2 锁定规则

一个unLock操作先行发生于后面((这里的“后面”是指时间上的先后))对同一个锁的lock操作;
  • 1

3.4.3 volatile变量规则

对一个volatile变量的写操作先行发生于后面对这个变量的读操作,
前面的写对后面的读是可见的,这里的“后面”同样是指时间上的先后。
  • 1
  • 2

3.4.4传递规则

如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
  • 1

3.4.5线程启动规则

Thread对象的start()方法先行发生于此线程的每一个动作
  • 1

3.4.6线程中断规则

对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
可以通过Thread.interrupted()检测到是否发生中断
  • 1
  • 2

3.4.7线程终止规则

线程中的所有操作都先行发生于对此线程的终止检
测,我们可以通过Thread::join()方法是否结束、
Thread::isAlive()的返回值等手段检测线程是否已经终止执行。
  • 1
  • 2
  • 3

3.4.8对象终止规则

一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始
  • 1

4.MESI

并发场景下(比如多线程)如果操作相同变量,如何保证每个核中缓存的变量是正确的值,这涉及到一些”缓存一致性“的协议。其中应用最广的就是MESI协议(当然这并不是唯一的缓存一致性协议)
  • 1

在这里插入图片描述

5.总线嗅探机制

CPU和内存通过总线(BUS)互通消息。

CPU感知其他CPU的行为(比如读、写某个缓存行)就是是通过嗅探(Snoop)线性中其他CPU发出的请求消息完成的,有时CPU也需要针对总线中的某些请求消息进行响应。这被称为”总线嗅探机制“。
  • 1
  • 2
  • 3

在这里插入图片描述

6.工作流程

6.1

CPU1从内存中将变量a加载到缓存中,并将变量a的状态改为E(独享),并通过总线嗅探机制对内存中变量a的操作进行嗅探
  • 1

在这里插入图片描述
6.2

此时,CPU2读取变量a,总线嗅探机制会将CPU1中的变量a的状态置为S(共享),并将变量a加载到CPU2的缓存中,状态为S
  • 1

在这里插入图片描述
6.3

CPU1对变量a进行修改操作,此时CPU1中的变量a会被置为M(修改)状态,而CPU2中的变量a会被通知,改为I(无效)状态,此时CPU2中的变量a做的任何修改都不会被写回内存中(高并发情况下可能出现两个CPU同时修改变量a,并同时向总线发出将各自的缓存行更改为M状态的情况,此时总线会采用相应的裁决机制进行裁决,将其中一个置为M状态,另一个置为I状态,且I状态的缓存行修改无效)
  • 1

在这里插入图片描述
6.4

CPU1将修改后的数据写回内存,并将变量a置为E(独占)状态
  • 1

在这里插入图片描述
6.5

此时,CPU2通过总线嗅探机制得知变量a已被修改,会重新去内存中加载变量a,同时CPU1和CPU2中的变量a都改为S状态
  • 1

在这里插入图片描述

四.volatile

https://zhuanlan.zhihu.com/p/133851347

https://blog.csdn.net/nrsc272420199/article/details/104854629?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163928396916780255266666%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=163928396916780255266666&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-4-104854629.pc_search_insert_es_download&utm_term=volatile%E5%BA%95%E5%B1%82%E6%98%AF%E6%80%8E%E4%B9%88%E5%8A%A0%E6%8C%87%E4%BB%A4%E7%9A%84&spm=1018.2226.3001.4187
  • 1
  • 2
  • 3

4.1volatile变量的禁止指令重排序
硬件层面的“内存屏障”

sfence:即写屏障(Store Barrier),在写指令之后插入写屏障,能让写入缓存的最新数据写回到主内存,以保证写入的数据立刻对其他线程可见
lfence:即读屏障(Load Barrier),在读指令前插入读屏障,可以让高速缓存中的数据失效,重新从主内存加载数据,以保证读取的是最新的数据。
mfence:即全能屏障(modify/mix Barrier ),兼具sfence和lfence的功能
lock 前缀:lock不是内存屏障,而是一种锁。执行时会锁住内存子系统来确保执行顺序,甚至跨多个CPU。
  • 1
  • 2
  • 3
  • 4

JMM层面的“内存屏障”

内存屏障:是一种屏障指令,它使得CPU 或编译器对屏障指令的前和后所发出的内存操作执行一个排序的约束。也叫内存栅栏或栅栏指令
作用:
1.阻止屏障两边的指令重排序
2.写数据时加入屏障,强制将线程私有工作内存的数据刷回主物理内存
3.读数据时加入屏障,线程私有工作内存的数据失效,重新到主物理内存中获取最新数据
  • 1
  • 2
  • 3
  • 4
  • 5
当我们给变量加入volatile修饰的时候底层会有volatile标识符
它影响的是Class内的Field 的flags :添加了一个ACC_VOLATILE
JVM在把字节码生成为机器码的时候,发现操作是volatile的变量的话,就会根据JMM要求,在相应的位置去插入内存屏障指令
  • 1
  • 2
  • 3

在这里插入图片描述

LoadLoad屏障(读操作的前面插入): 对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
StoreStore屏障(写操作的前面插入):对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
LoadStore屏障(读操作的后面插入):对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
StoreLoad屏障(写操作的后面插入): 对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。
  • 1
  • 2
  • 3
  • 4

Unsafe.java

 /**
     * Ensures lack of reordering of loads before the fence
     * with loads or stores after the fence.
     * @since 1.8
     */
    public native void loadFence();

    /**
     * Ensures lack of reordering of stores before the fence
     * with loads or stores after the fence.
     * @since 1.8
     */
    public native void storeFence();

    /**
     * Ensures lack of reordering of loads or stores before the fence
     * with loads or stores after the fence.
     * @since 1.8
     */
    public native void fullFence();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

unsafe.cpp

UNSAFE_ENTRY(void, Unsafe_LoadFence(JNIEnv *env, jobject unsafe))
  UnsafeWrapper("Unsafe_LoadFence");
  OrderAccess::acquire();
UNSAFE_END

UNSAFE_ENTRY(void, Unsafe_StoreFence(JNIEnv *env, jobject unsafe))
  UnsafeWrapper("Unsafe_StoreFence");
  OrderAccess::release();
UNSAFE_END

UNSAFE_ENTRY(void, Unsafe_FullFence(JNIEnv *env, jobject unsafe))
  UnsafeWrapper("Unsafe_FullFence");
  OrderAccess::fence();
UNSAFE_END
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

orderAccess.hpp


  static void     loadload();
  static void     storestore();
  static void     loadstore();
  static void     storeload();
  • 1
  • 2
  • 3
  • 4
  • 5

orderAccess_windows_x86.inline.hpp

// Implementation of class OrderAccess.
inline void OrderAccess::loadload()   { acquire(); }
inline void OrderAccess::storestore() { release(); }
inline void OrderAccess::loadstore()  { acquire(); }
inline void OrderAccess::storeload()  { fence(); }
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

4.2 volatile如何保证可见性
在这里插入图片描述

lock前缀的指令在多核处理器下会引发两件事情

1. 将当前处理器缓存行的数据写回到系统内存。
2. 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效。

为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存后再进行操作,但是操作完了不知道什么时候写回内存。而对声明了volatile关键字的变量进行写操作,JVM会向处理器发送一条lock前缀的指令,将这个变量所在的缓存行立即写回系统内存。并且为了保证各个处理器的缓存是一致的,实现了缓存一致性协议,各个处理通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,那么下次对这个数据进行操作,就会重新从系统内存中获取最新的值。对应JMM来说就是:
1. Lock前缀的指令让线程工作内存中的值写回主内存中;
2. 通过缓存一致性协议,其他线程如果工作内存中存了该共享变量的值,就会失效;
3. 其他线程会重新从主内存中获取最新的值;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.3volatile为什么不能保证原子性问题
在这里插入图片描述
下图的8种操作是定义在java内存模型当中的,我们的任何操作都需要通过这几种方式来进行。

read(读取):从主内存读取数据
load(载入):将主内存读取到的数据写入工作内存
use(使用):从工作内存读取数据来计算
assign(赋值):将计算好的值重新赋值到工作内存中
store(存储):将工作内存数据写入主内存
write(写入):将store过去的变量值赋值给主内存中的变量
lock(锁定):将主内存变量加锁,标识为线程独占状态
unlock(解锁):将主内存变量解锁,解锁后其他线程可以锁定该变量
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述

五、CountDownLatch用法

主要用于主线程等待子线程都执行完当前任务(计数器)

public class AtomicIntegerDemo
{
    public static final int SIEZ_ = 50;

    public static void main(String[] args) throws InterruptedException
    {

        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIEZ_);

        for (int i = 1; i <=SIEZ_; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1 ;j <=1000; j++) {
                        myNumber.addPlusPlus();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            },String.valueOf(i)).start();
        }

        //try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName()+"\t"+"---result : "+myNumber.atomicInteger.get());


    }
}
class MyNumber
{
    AtomicInteger atomicInteger = new AtomicInteger();

    public void addPlusPlus()
    {
        atomicInteger.incrementAndGet();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

六、LongAdder(只能加减)

测试自增性能

class ClickNumber
{
    int number = 0;
    public synchronized void add_Synchronized()
    {
        number++;
    }

    AtomicInteger atomicInteger = new AtomicInteger();
    public void add_AtomicInteger()
    {
        atomicInteger.incrementAndGet();
    }

    AtomicLong atomicLong = new AtomicLong();
    public void add_AtomicLong()
    {
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();
    public void add_LongAdder()
    {
        longAdder.increment();
        //longAdder.sum();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x+y,0);
    public void add_LongAccumulator()
    {
        longAccumulator.accumulate(1);
    }

}


/**
 * 
 *
 *  50个线程,每个线程100W次,总点赞数出来
 */
public class LongAdderCalcDemo
{
    public static final int SIZE_THREAD = 50;
    public static final int _1W = 10000;

    public static void main(String[] args) throws InterruptedException
    {
        ClickNumber clickNumber = new ClickNumber();
        long startTime;
        long endTime;

        CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);
        //========================

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=SIZE_THREAD; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.add_Synchronized();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch1.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_Synchronized"+"\t"+clickNumber.number);


        startTime = System.currentTimeMillis();
        for (int i = 1; i <=SIZE_THREAD; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.add_AtomicInteger();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch2.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicInteger"+"\t"+clickNumber.atomicInteger.get());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=SIZE_THREAD; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.add_AtomicLong();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch3.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicLong"+"\t"+clickNumber.atomicLong.get());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=SIZE_THREAD; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.add_LongAdder();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch4.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAdder"+"\t"+clickNumber.longAdder.longValue());

        startTime = System.currentTimeMillis();
        for (int i = 1; i <=SIZE_THREAD; i++) {
            new Thread(() -> {
                try
                {
                    for (int j = 1; j <=100 * _1W; j++) {
                        clickNumber.add_LongAccumulator();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    countDownLatch5.countDown();
                }
            },String.valueOf(i)).start();
        }
        countDownLatch5.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAccumulator"+"\t"+clickNumber.longAccumulator.longValue());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156

在这里插入图片描述

6.1、LongAdder为什么这么快

1LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

2.Cell使用@Contended避免缓存行伪共享问题
缓存行:
CPU读取内存数据时并非一次只读一个字节,而是会读一段字节长度(不同CPU架构读取的长度不一样,常见的大小是64字节)的连续的内存块,这些称之为缓存行。
伪共享:
当一个CPU要修改某共享变量A时会先锁定自己缓存里A所在的缓存行,并且把其他CPU缓存上相关的缓存行设置为无效。但如果被锁定或失效的缓存行里,还存储了其他不相干的变量B,其他线程此时就访问不了B,或者由于缓存行失效需要重新从内存中读取加载到缓存里,这就造成了开销。所以让共享变量A单独使用一个缓存行就不会影响到其他线程的访问。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述

6.2LongAdder源码

在这里插入图片描述

     public void add(long x) {
     	//as:表示cells引用 b:表示获取的base值 v:表示期望值 m:表示cells 数组的长度 a:表示当前线程命中的cell单元格
        Cell[] as; long b, v; int m; Cell a;
        //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
        //       false->表示cells未初始化,当前所有线程应该将数据写到base中
        //条件二:false->表示当前线程cas替换数据成功,
        //       true->表示发生竞争了,可能需要重试 或者 扩容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //true -> 未竞争  false->发生竞争
            boolean uncontended = true;
            //as==null为true表示没有竞争 
            //(m = as.length - 1) < 0为true表示Cell长度为0
            //(a = as[getProbe() & m]) == null 为true表示将值放入cell中失败
            //!(uncontended = a.cas(v = a.value, v + x))为true表示cas失败需要扩容
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //进入这里有2种情况①cell没有被创建②cas失败需要扩容
                longAccumulate(x, null, uncontended);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
	//this指代当前对象,BASE指内存地址,cmp指预期值,val指目标值
	final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
  • 1
  • 2
  • 3
  • 4
static {
        try {
            UNSAFE = Unsafe.getUnsafe();
            Class var0 = Striped64.class;
            //获取base的在内存中的偏移量
            BASE = UNSAFE.objectFieldOffset(var0.getDeclaredField("base"));
            //获取cells属性在内存的偏移量  0表示无锁状态,1表示其他线程已经持有锁
            CELLSBUSY = UNSAFE.objectFieldOffset(var0.getDeclaredField("cellsBusy"));
            Class var1 = Thread.class;
            //获取线程随机值在内存中的偏移量
            PROBE = UNSAFE.objectFieldOffset(var1.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception var2) {
            throw new Error(var2);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //h 表示线程hash值
        int h;
        //条件成立:说明当前线程 还未分配hash值; getProbe()获取当前线程的Hash值
        if ((h = getProbe()) == 0) {
        	//给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            //取出当前线程的hash值 赋值给h
            h = getProbe();
            wasUncontended = true;
        }
        //表示扩容意向 false 一定不会扩容,true 可能会扩容。
        boolean collide = false;                // True if last slot nonempty
        //自旋
        for (;;) {
        	//as:cells引用 a:当前线程命中的cell n:cells数组长度 v:期望值
            Cell[] as; Cell a; int n; long v;
            //CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
            	//当前线程对应的下标位置的cell为null,需要创建new Cell
                if ((a = as[(n - 1) & h]) == null) {
                	//true->表示当前锁 未被占用  false->表示锁被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                    	//拿当前的x创建Cell
                        Cell r = new Cell(x);   // Optimistically create
                        //条件一:true->表示当前锁 未被占用  false->表示锁被占用
                        //条件二:true->表示当前线程获取锁成功  false->当前线程获取锁失败..
                        if (cellsBusy == 0 && casCellsBusy()) {
                       		//是否创建成功 标记
                            boolean created = false;
                            try {               // Recheck under lock
                            	 //rs 表示当前cells 引用
                                //m 表示cells长度
                                //j 表示当前线程命中的下标
                                Cell[] rs; int m, j;
                                //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过该位置,然后当前线程再次初始化该位置导致丢失数据
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // 只有cells初始化之后,并且当前线程 竞争修改失败,才会是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //说明当前线程对应的数组中有了数据,也重置过hash值,
                //这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。  
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                 //达到cpu最大核数或者其它线程已经扩容过了就不扩容了
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                 //没有扩容过--》设置扩容意向   
                else if (!collide)
                    collide = true;
                //表示当前无锁状态,当前线程可以去竞争这把锁并且当前线程获取锁成功,可以执行扩容逻辑
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //重置当前线程hash值
                h = advanceProbe(h);
            }
            //当前线程还未加锁并且设置锁成功
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                	//双重检锁 cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base
            //2.cells被其它线程初始化后,当前线程需要将数据累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

在这里插入图片描述

总结

1.LongAdder是一种以空间换时间的解决方案,其在高并发,竞争大的情况下性能更优。
2.但是,sum方法拿到的只是接近值,追求最终一致性。如果业务场景追求高精度,高准确性,用AtomicLong。
  • 1
  • 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/363175?site
推荐阅读
相关标签
  

闽ICP备14008679号