当前位置:   article > 正文

高性能内存队列Disruptor使用_com.lmax.disruptor

com.lmax.disruptor
认识Disruptor

Disruptor是一个开源框架,研发的初衷是为了解决高并发下列队锁的问题,最早由 LMAX(一种新型零售金融交易平台)提出并使用,能够在无锁的情况下实现队列的并发操 作,并号称能够在一个线程里每秒处理6百万笔订单(这个真假就不清楚了!牛皮谁都会 吹)。

Github:https://github.com/LMAX-Exchange/disruptor

Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。

讲到生产消费模型,大家应该马上就能回忆起前面我们已经学习过的BlockingQueue 课程,里面我们学习过多种队列,但是这些队列大多是基于条件阻塞方式的,性能还不够优 秀!

juc下队列存在的问题

队列

描述

ArrayBlockingQueue

基于数组结构实现的一个有界阻塞队列

LinkedBlockingQueue

基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列

PriorityBlockingQueue

支持按优先级排序的无界阻塞队列

DelayQueue

基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列

SynchronousQueue

不存储元素的阻塞队列

LinkedTransferQueue

基于链表结构实现的一个无界阻塞队列

LinkedBlockingDeque

基于链表结构实现的一个双端阻塞队列

1. juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。

2. 加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。

3. 有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

  • 利用缓存行填充解决了伪共享的问题

  • 实现了基于事件驱动的生产者消费者模型(观察者模式)

消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费

RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:

  • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)

  • 当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉

思考:能覆盖数据是否会导致数据丢失呢?

当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的:

  • BlockingWaitStrategy策略常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景

  • SleepingWaitStrategy策略会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志。

  • YieldingWaitStrategy策略这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。

  • BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用

一个生产者单线程写数据的流程

  1. 申请写入m个元素;

  1. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素;

  1. 若是返回的正确,则生产者开始写入元素。

多个生产者写数据的流程

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

消费者读数据

生产者多线程写入的情况下读数据会复杂很多:

  1. 申请读取到序号n;

  1. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;

  1. 消费者读取元素。

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。

多个生产者写数据

多个生产者写入的时候:

  1. 申请写入m个元素;

  1. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;

  1. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

Disruptor核心概念

  • RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。

  • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。

  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。

  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法

  • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。

  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略

  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。

  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。

  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

框架使用

在项目中我们废弃了原有的juc下的队列,采用了Disruptor这个框架来进行操作日志的处理

  1. 构造函数
  2. public Disruptor(
  3. final EventFactory<T> eventFactory, //创建事件(任务)的工厂类
  4. final int ringBufferSize, // 数组大小
  5. final ThreadFactory threadFactory, //用于创建线程的工厂
  6. final ProducerType producerType, // 生产这类型 单个生产个 多个生产这
  7. final WaitStrategy waitStrategy) //等待策略

  1. maven依赖

  1. <!-- disruptor 高速队列-->
  2. <dependency>
  3. <groupId>com.lmax</groupId>
  4. <artifactId>disruptor</artifactId>
  5. <version>3.4.2</version>
  6. </dependency>
  1. 创建event事件 和 事件工厂

  1. /**
  2. * 创建事件(任务)的工厂类
  3. *
  4. * @return {@link EventFactory }<{@link BizHanderEvent }> 返回值类型
  5. * @author 余浪
  6. */
  7. @Bean
  8. public EventFactory<BizHanderEvent> eventEventFactory() {
  9. EventFactory<BizHanderEvent> orderEventEventFactory = new EventFactory<BizHanderEvent>() {
  10. @Override
  11. public BizHanderEvent newInstance() {
  12. return new BizHanderEvent();
  13. }
  14. };
  15. return orderEventEventFactory;
  16. }
  17. /**
  18. * 业务事件
  19. *
  20. * @Date: 2021/3/10 14:33
  21. * @Version: 1.0.0
  22. **/
  23. @Data
  24. public class BizHanderEvent implements Serializable {
  25. /**
  26. * 时间戳
  27. */
  28. private final long timestamp;
  29. /**
  30. * 事件携带的数据
  31. */
  32. protected transient Object source;
  33. public BizHanderEvent() {
  34. this.timestamp = System.currentTimeMillis();
  35. }
  36. public BizHanderEvent(Object source) {
  37. this.timestamp = System.currentTimeMillis();
  38. this.source = source;
  39. }
  40. }
  1. 创建消费者handler

  1. package com.cloud.common.log.disruptor;
  2. import cn.hutool.core.lang.UUID;
  3. import com.cloud.common.core.disruptor.BizHanderEvent;
  4. import com.cloud.common.log.dao.LogInfoDao;
  5. import com.cloud.common.log.model.SysLog;
  6. import com.lmax.disruptor.EventHandler;
  7. import lombok.Data;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.util.IdGenerator;
  10. import java.time.LocalDateTime;
  11. import java.util.ArrayList;
  12. import java.util.Collections;
  13. import java.util.List;
  14. /**
  15. * 该对象 有多个: 和Symbol的数据对应
  16. * 针对某一个 LogEventHandler,只会同一时间有一个线程来执行它
  17. *
  18. * @author yulang
  19. */
  20. @Data
  21. @Slf4j
  22. public class LogEventHandler implements EventHandler<BizHanderEvent> {
  23. private LogInfoDao logInfoDao;
  24. private volatile List<SysLog> list = Collections.synchronizedList(new ArrayList<SysLog>(36));
  25. private Integer batchLogSize;
  26. public LogEventHandler(LogInfoDao logInfoDao,Integer batchLogSize) {
  27. this.logInfoDao = logInfoDao;
  28. if(batchLogSize==null || batchLogSize < 5){
  29. batchLogSize = 15;
  30. }
  31. this.batchLogSize = batchLogSize;
  32. }
  33. /**
  34. * 接收到了某个消息
  35. *
  36. * @param event
  37. * @param sequence
  38. * @param endOfBatch
  39. * @throws Exception
  40. */
  41. @Override
  42. public void onEvent(BizHanderEvent event, long sequence, boolean endOfBatch) throws Exception {
  43. log.info("开始接收log事件============>{}-- sequence:{} -- endOfBatch:{} ", event, sequence, endOfBatch);
  44. // 从ringbuffer 里面接收了某个数据
  45. SysLog order = (SysLog) event.getSource();
  46. order.setUpdateTime(LocalDateTime.now());
  47. order.setId(IdWorkerUtil.getId());
  48. list.add(order);
  49. if (list.size() >= batchLogSize) {
  50. logInfoDao.addLogs(list);
  51. list.clear();
  52. }
  53. log.info("处理完成我们的log事件===================>{}", event);
  54. }
  55. }
  1. 定义生产者事件

  1. package com.cloud.common.log.disruptor;
  2. import com.cloud.common.core.disruptor.BizHanderEvent;
  3. import com.cloud.common.log.model.SysLog;
  4. import com.lmax.disruptor.EventTranslatorOneArg;
  5. import com.lmax.disruptor.RingBuffer;
  6. /**
  7. * 在boot里面使用它发送消息
  8. */
  9. @compent
  10. public class DisruptorTemplate {
  11. private static final EventTranslatorOneArg<BizHanderEvent, SysLog> TRANSLATOR = new EventTranslatorOneArg<BizHanderEvent, SysLog>() {
  12. @Override
  13. public void translateTo(BizHanderEvent event, long sequence, SysLog input) {
  14. event.setSource(input);
  15. }
  16. };
  17. private final RingBuffer<BizHanderEvent> ringBuffer;
  18. public DisruptorTemplate(RingBuffer<BizHanderEvent> ringBuffer) {
  19. this.ringBuffer = ringBuffer;
  20. }
  21. /**
  22. * 我们使用DisruptorTemplate 时,就使用它的onData方法
  23. *
  24. * @param input
  25. */
  26. public void onData(SysLog input) {
  27. ringBuffer.publishEvent(TRANSLATOR, input);
  28. }
  29. }
  1. 其他一些相关的配置类

  1. package com.cloud.common.core.disruptor;
  2. import com.lmax.disruptor.*;
  3. import com.lmax.disruptor.dsl.Disruptor;
  4. import com.lmax.disruptor.dsl.ProducerType;
  5. //import net.openhft.affinity.AffinityThreadFactory;
  6. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.concurrent.ThreadFactory;
  10. import static java.util.concurrent.Executors.defaultThreadFactory;
  11. /**
  12. *
  13. * 传统的内存队列如果需要保存线程安全 通过锁 来实现,
  14. * 或者是通过 cas 实现,而 volatile类型的变量进行 CAS 操作,存在伪共享问题
  15. *
  16. * 多核cpu下 ,现在主流的CPU都是多核心(超核)的处理器,每一个核心可以同时执行一个线程,那么计算机在同一时刻就可以执行多条线程(中的指令),
  17. * 由于程序的局部性原理,当多个线程在执行时需要处理物理位置相近的数据时,就会同时将位于同一个缓存行的数据load到缓冲中,
  18. * 多条线程处理了数据时会发现缓存失效的问题,重新从内存中load数据,造成多个线程相互影响,形成伪共享。
  19. *
  20. * 缓存伪共享
  21. * 共享对象存在同一个缓存中,由于MESI协议,一个对象中一些不需要改变的属性因为其他改变的属性,导致整个对象的缓存进入到M被修改状态
  22. *
  23. * 目前的CPU是按照64K的缓存行(Cache Line)进行读取,如果读取的数据在同一个CacheLine,就存在缓存伪共享的问题。
  24. *
  25. * 对象被放入一个CacheLine中,根据MSEI协议,其中一个属性改变,其他所有没有改变的属性也变得不可共享。
  26. *
  27. * 填充Cache Line缓存块
  28. * 通过填充对象,将对象中常被改变的属性和不常改变的属性分开到不通缓存Cache Line中。避免缓存的伪共享。
  29. *
  30. *
  31. * Disruptor 中的 com.lmax.disruptor.Sequence 类包装了一个volatile修饰的 long类型数据 value,无论是Disruptor中的基于数组实现的
  32. * 缓冲区RingBuffer
  33. * value 前后被包围了 7个 long类型的值 ,对于一个大小为64字节的缓存行,它刚好被填补满 所以不会出现伪共享问题
  34. *
  35. *
  36. * @author: 余浪
  37. * @date: 2021/9/5
  38. **/
  39. @Configuration
  40. @EnableConfigurationProperties(value = DisruptorProperties.class)
  41. public class DisruptorAutoConfiguration {
  42. public DisruptorProperties disruptorProperties;
  43. public DisruptorAutoConfiguration(DisruptorProperties disruptorProperties) {
  44. this.disruptorProperties = disruptorProperties;
  45. }
  46. /**
  47. * 创建事件(任务)的工厂类
  48. *
  49. * @return {@link EventFactory }<{@link BizHanderEvent }> 返回值类型
  50. * @author 余浪
  51. */
  52. @Bean
  53. public EventFactory<BizHanderEvent> eventEventFactory() {
  54. EventFactory<BizHanderEvent> orderEventEventFactory = new EventFactory<BizHanderEvent>() {
  55. @Override
  56. public BizHanderEvent newInstance() {
  57. return new BizHanderEvent();
  58. }
  59. };
  60. return orderEventEventFactory;
  61. }
  62. /**
  63. * 用于创建执行任务的线程。
  64. *
  65. * @return {@link ThreadFactory } 返回值类型
  66. * @author 余浪
  67. */
  68. @Bean
  69. public ThreadFactory threadFactory() {
  70. return defaultThreadFactory();
  71. }
  72. /**
  73. * 无锁高效的等待策略
  74. *
  75. * @return
  76. */
  77. @Bean
  78. public WaitStrategy waitStrategy() {
  79. return new SleepingWaitStrategy();
  80. }
  81. /**
  82. * 创建一个RingBuffer
  83. * eventFactory: 事件工厂
  84. * threadFactory: 我们执行者(消费者)的线程该怎么创建
  85. * waitStrategy : 等待策略: 当我们ringBuffer 没有数据时,我们怎么等待
  86. */
  87. @Bean
  88. public RingBuffer<BizHanderEvent> ringBuffer(
  89. EventFactory<BizHanderEvent> eventFactory,
  90. ThreadFactory threadFactory,
  91. WaitStrategy waitStrategy,
  92. EventHandler<BizHanderEvent>[] eventHandlers) {
  93. /**
  94. * 构建disruptor
  95. */
  96. Disruptor<BizHanderEvent> disruptor = null;
  97. ProducerType producerType = ProducerType.SINGLE;
  98. if (disruptorProperties.isMultiProducer()) {
  99. producerType = ProducerType.MULTI;
  100. }
  101. disruptor = new Disruptor<BizHanderEvent>(eventFactory, disruptorProperties.getRingBufferSize(), threadFactory, producerType, waitStrategy);
  102. disruptor.setDefaultExceptionHandler(new DisruptorHandlerException());
  103. // 设置消费者---我们的每个消费者代表我们的一个 消息对,有多少个消息对,我们就有多少个eventHandlers ,事件来了后,多个eventHandlers 是并发执行的
  104. disruptor.handleEventsWith(eventHandlers);
  105. RingBuffer<BizHanderEvent> ringBuffer = disruptor.getRingBuffer();
  106. disruptor.start();// 开始监听
  107. final Disruptor<BizHanderEvent> disruptorShutdown = disruptor;
  108. // 使用优雅的停机
  109. Runtime.getRuntime().addShutdownHook(new Thread(
  110. () -> {
  111. disruptorShutdown.shutdown();
  112. }, "DisruptorShutdownThread"
  113. ));
  114. return ringBuffer;
  115. }
  116. }
  117. /**
  118. * DisruptorHandlerException 的异常处理
  119. */
  120. @Slf4j
  121. public class DisruptorHandlerException implements ExceptionHandler {
  122. /**
  123. * <p>Strategy for handling uncaught exceptions when processing an event.</p>
  124. *
  125. * <p>If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}
  126. * then it should throw a {@link RuntimeException}.</p>
  127. *
  128. * @param ex the exception that propagated from the {@link EventHandler}.
  129. * @param sequence of the event which cause the exception.
  130. * @param event being processed when the exception occurred. This can be null.
  131. */
  132. @Override
  133. public void handleEventException(Throwable ex, long sequence, Object event) {
  134. log.info(" log into db error: {}", ex.getMessage());
  135. log.error("handleEventException Exception===>{} , sequence==> {} ,event===>{} ", ex.getMessage(), sequence, event);
  136. }
  137. /**
  138. * Callback to notify of an exception during {@link LifecycleAware#onStart()}
  139. *
  140. * @param ex throw during the starting process.
  141. */
  142. @Override
  143. public void handleOnStartException(Throwable ex) {
  144. log.info("OnStartHandler Exception===>{} ", ex.getMessage());
  145. }
  146. /**
  147. * Callback to notify of an exception during {@link LifecycleAware#onShutdown()}
  148. *
  149. * @param ex throw during the shutdown process.
  150. */
  151. @Override
  152. public void handleOnShutdownException(Throwable ex) {
  153. log.error("OnShutdownHandler Exception===>{} ", ex.getMessage());
  154. }
  155. }
  156. 配置属性类
  157. @Data
  158. @ConfigurationProperties(prefix = "spring.disruptor")
  159. public class DisruptorProperties {
  160. /**
  161. * 缓冲区的大小
  162. */
  163. private Integer ringBufferSize = 1024 * 1024;
  164. /**
  165. * 是否支持多生产者
  166. */
  167. private boolean isMultiProducer = false;
  168. }
  1. 具体使用

  1. package com.cloud.common.log.disruptor;
  2. import cn.hutool.core.util.StrUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.cloud.common.core.exception.BizException;
  5. import com.cloud.common.core.util.ResponseResult;
  6. import com.cloud.common.core.util.SsoHolderUtil;
  7. import com.cloud.common.core.util.TenantContextHolder;
  8. import com.cloud.common.core.util.WebUtils;
  9. import com.cloud.common.log.annotation.LogInfo;
  10. import com.cloud.common.log.model.SysLog;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.aspectj.lang.ProceedingJoinPoint;
  13. import org.aspectj.lang.annotation.Around;
  14. import org.aspectj.lang.annotation.Aspect;
  15. import org.aspectj.lang.reflect.MethodSignature;
  16. import org.jetbrains.annotations.NotNull;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.core.env.Environment;
  19. import org.springframework.web.context.request.RequestContextHolder;
  20. import org.springframework.web.context.request.ServletRequestAttributes;
  21. import org.springframework.web.multipart.MultipartFile;
  22. import javax.servlet.http.HttpServletRequest;
  23. import javax.servlet.http.HttpServletResponse;
  24. import java.time.LocalDateTime;
  25. import java.util.HashMap;
  26. import java.util.Map;
  27. /**
  28. * 操作日志使用spring event异步入库
  29. */
  30. @Slf4j
  31. @SpringBootTest
  32. public class SysLogTest {
  33. @Autowired
  34. private DisruptorTemplate disruptorTemplate;
  35. @Test
  36. public void logTest() {
  37. SysLog logVo = new SysLog();
  38. //生产日志数据 直接使用模板类进行生产消息即可
  39. disruptorTemplate.onData(logVo);
  40. }
  41. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/254986
推荐阅读
相关标签
  

闽ICP备14008679号