当前位置:   article > 正文

disruptor在基础框架中的实践_disputor

disputor

disruptor是什么? 

高性能队列——Disruptor - 美团技术团队

这篇文章是美团技术团队上面的,全面讲解disputor为何如此之快,以及ringbuffer的数据结构和java内置的队列对比,disruptor在Log4j2的应用对性能的大大提升。

disruptor的核心组件

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

RingBuffer

如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

SequenceDisruptor

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

Wait Strategy

定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

disruptor框架实际运用

https://blog.csdn.net/qq_28175019/article/details/104569258

这是一个mysql和redis同步框架,采用嵌入式jar包的方式实现数据实时同步。下面将会以这个组件为例子,讲解它是如何使用到disruptor的。

背景

框架为了实现mysql和redis的数据强一致性,采用数据层代理方式拦截数据变更事件,然后将这些事件通过disruptor组件进行分发,分发到pipeline组件进行事件处理。

 定义我们业务需要处理的顺序数据实体,tasks表示的是sql解析任务列表:

  1. public class SeriesData {
  2. private List<SqlParserTask> tasks;
  3. public SeriesData() {
  4. }
  5. public SeriesData(List<SqlParserTask> tasks) {
  6. this.tasks = tasks;
  7. }
  8. public List<SqlParserTask> getTasks() {
  9. return tasks;
  10. }
  11. @Override
  12. public String toString() {
  13. return "SeriesData{" +
  14. "tasks=" + tasks +
  15. '}';
  16. }
  17. }

定义disrupter需要处理的事件

  1. public class SeriesDataEvent extends ValueWrapper<SeriesData> {
  2. }

定义异常处理器,对我们的异常进行日志打印

  1. public class DisruptorHandlerException implements ExceptionHandler {
  2. private Logger logger = LoggerFactory.getLogger(DisruptorHandlerException.class);
  3. /*
  4. * (non-Javadoc) 运行过程中发生时的异常
  5. *
  6. * @see
  7. * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
  8. * , long, java.lang.Object)
  9. */
  10. @Override
  11. public void handleEventException(Throwable ex, long sequence, Object event) {
  12. ex.printStackTrace();
  13. logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  14. }
  15. /*
  16. * (non-Javadoc) 启动时的异常
  17. *
  18. * @see
  19. * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
  20. * Throwable)
  21. */
  22. @Override
  23. public void handleOnStartException(Throwable ex) {
  24. logger.error("start disruptor error ==[{}]!", ex.getMessage());
  25. }
  26. /*
  27. * (non-Javadoc) 关闭时的异常
  28. *
  29. * @see
  30. * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
  31. * .Throwable)
  32. */
  33. @Override
  34. public void handleOnShutdownException(Throwable ex) {
  35. logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  36. }
  37. }

定义事件处理工厂

  1. public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
  2. @Override
  3. public SeriesDataEvent newInstance() {
  4. return new SeriesDataEvent();
  5. }
  6. }

初始化disruptor的配置信息,事件工厂、队列大小、清除钩子等

  1. package com.dm.disruptor;
  2. import com.dm.constant.DmConstants;
  3. import com.dm.core.NamedThreadFactory;
  4. import com.lmax.disruptor.EventFactory;
  5. import com.lmax.disruptor.RingBuffer;
  6. import com.lmax.disruptor.WaitStrategy;
  7. import com.lmax.disruptor.WorkHandler;
  8. import com.lmax.disruptor.dsl.Disruptor;
  9. import com.lmax.disruptor.dsl.ProducerType;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. /**
  13. * @author zy
  14. */
  15. public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
  16. /**
  17. * 记录所有的队列,系统退出时统一清理资源
  18. */
  19. private List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
  20. /**
  21. * Disruptor 对象
  22. */
  23. private Disruptor<E> disruptor;
  24. /**
  25. * RingBuffer
  26. */
  27. private RingBuffer<E> ringBuffer;
  28. /**
  29. * initQueue
  30. */
  31. private List<D> initQueue = new ArrayList();
  32. /**
  33. * 事件工厂
  34. *
  35. * @return EventFactory
  36. */
  37. protected EventFactory<E> eventFactory() {
  38. return null;
  39. }
  40. /**
  41. * 队列大小
  42. *
  43. * @return 队列长度,必须是2的幂
  44. */
  45. protected int getQueueSize() {
  46. return 0;
  47. }
  48. /**
  49. * 初始化
  50. */
  51. public void init() {
  52. this.disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), new NamedThreadFactory(DmConstants.DIS_THREAD_NAME_FORMAT), ProducerType.SINGLE, getStrategy());
  53. this.disruptor.setDefaultExceptionHandler(new DisruptorHandlerException());
  54. this.disruptor.handleEventsWithWorkerPool(new WorkHandler[]{new DispatcherEventHandler()});
  55. this.ringBuffer = disruptor.start();
  56. //初始化数据发布
  57. for (D data : initQueue) {
  58. ringBuffer.publishEvent((event, sequence, data1) -> event.setValue(data1), data);
  59. }
  60. //加入资源清理钩子
  61. synchronized (queueHelperList) {
  62. if (queueHelperList.isEmpty()) {
  63. Runtime.getRuntime().addShutdownHook(new Thread() {
  64. @Override
  65. public void run() {
  66. for (BaseQueueHelper baseQueueHelper : queueHelperList) {
  67. baseQueueHelper.shutdown();
  68. }
  69. }
  70. });
  71. }
  72. queueHelperList.add(this);
  73. }
  74. }
  75. /**
  76. * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
  77. * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
  78. *
  79. * @return WaitStrategy
  80. */
  81. protected abstract WaitStrategy getStrategy();
  82. /**
  83. * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
  84. */
  85. public synchronized void publishEvent(D data) {
  86. if (ringBuffer == null) {
  87. initQueue.add(data);
  88. return;
  89. }
  90. ringBuffer.publishEvent((event, sequence, data1) -> event.setValue(data1), data);
  91. }
  92. /**
  93. * 关闭队列
  94. */
  95. public void shutdown() {
  96. disruptor.shutdown();
  97. }
  98. }

事件到来后,将事件推送到disruptor

  1. @Override
  2. public void commit() throws SQLException {
  3. targetConnection.commit();
  4. DmLocalContext cur = DmLocalContext.cur();
  5. if (cur != null) {
  6. SeriesDataEventQueueHelper.Instance().publishEvent(new SeriesData(cur.getTasks()));
  7. }
  8. }

事件处理器,消费获取的事件,然后由pipeline对任务进行处理: 

  1. public class DispatcherEventHandler implements WorkHandler<SeriesDataEvent> {
  2. private Logger logger = LoggerFactory.getLogger(DispatcherEventHandler.class);
  3. @Override
  4. public void onEvent(SeriesDataEvent event) {
  5. List<SqlParserTask> tasks = event.getValue().getTasks();
  6. if (event.getValue() == null || StringUtils.isEmpty(tasks)) {
  7. logger.warn("Receiver series data is empty!");
  8. }
  9. logger.info("Receiver series data is :" + Arrays.toString(tasks.toArray()));
  10. Pipeline pipeline = PipelineContext.getPipeline();
  11. tasks.stream().forEach(e -> {
  12. if (e != null) {
  13. pipeline.process(e);
  14. }
  15. });
  16. }
  17. /* public int hash(int parallelSize, long id){
  18. int iid = Integer.parseInt(String.valueOf(id));
  19. int n = parallelSize - 1;
  20. n |= n >>> 1;
  21. n |= n >>> 2;
  22. n |= n >>> 4;
  23. n |= n >>> 8;
  24. n |= n >>> 16;
  25. n = (n < 0) ? 1 : (n >= DmConstants.MAXIMUM_CAPACITY) ? DmConstants.MAXIMUM_CAPACITY : n + 1;
  26. return (n - 1 ) & iid;
  27. }*/
  28. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/255015
推荐阅读
相关标签
  

闽ICP备14008679号