赞
踩
这篇文章是美团技术团队上面的,全面讲解disputor为何如此之快,以及ringbuffer的数据结构和java内置的队列对比,disruptor在Log4j2的应用对性能的大大提升。
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
RingBuffer
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
SequenceDisruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。(注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
https://blog.csdn.net/qq_28175019/article/details/104569258
这是一个mysql和redis同步框架,采用嵌入式jar包的方式实现数据实时同步。下面将会以这个组件为例子,讲解它是如何使用到disruptor的。
背景
框架为了实现mysql和redis的数据强一致性,采用数据层代理方式拦截数据变更事件,然后将这些事件通过disruptor组件进行分发,分发到pipeline组件进行事件处理。
定义我们业务需要处理的顺序数据实体,tasks表示的是sql解析任务列表:
- public class SeriesData {
-
- private List<SqlParserTask> tasks;
-
- public SeriesData() {
- }
-
- public SeriesData(List<SqlParserTask> tasks) {
- this.tasks = tasks;
- }
-
- public List<SqlParserTask> getTasks() {
- return tasks;
- }
-
- @Override
- public String toString() {
- return "SeriesData{" +
- "tasks=" + tasks +
- '}';
- }
- }
定义disrupter需要处理的事件
- public class SeriesDataEvent extends ValueWrapper<SeriesData> {
-
- }
定义异常处理器,对我们的异常进行日志打印
- public class DisruptorHandlerException implements ExceptionHandler {
-
- private Logger logger = LoggerFactory.getLogger(DisruptorHandlerException.class);
-
- /*
- * (non-Javadoc) 运行过程中发生时的异常
- *
- * @see
- * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
- * , long, java.lang.Object)
- */
- @Override
- public void handleEventException(Throwable ex, long sequence, Object event) {
- ex.printStackTrace();
- logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
- }
-
- /*
- * (non-Javadoc) 启动时的异常
- *
- * @see
- * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
- * Throwable)
- */
- @Override
- public void handleOnStartException(Throwable ex) {
- logger.error("start disruptor error ==[{}]!", ex.getMessage());
- }
-
- /*
- * (non-Javadoc) 关闭时的异常
- *
- * @see
- * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
- * .Throwable)
- */
- @Override
- public void handleOnShutdownException(Throwable ex) {
- logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
- }
-
- }
定义事件处理工厂
- public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
-
-
- @Override
- public SeriesDataEvent newInstance() {
- return new SeriesDataEvent();
- }
- }
初始化disruptor的配置信息,事件工厂、队列大小、清除钩子等
- package com.dm.disruptor;
-
- import com.dm.constant.DmConstants;
- import com.dm.core.NamedThreadFactory;
- import com.lmax.disruptor.EventFactory;
- import com.lmax.disruptor.RingBuffer;
- import com.lmax.disruptor.WaitStrategy;
- import com.lmax.disruptor.WorkHandler;
- import com.lmax.disruptor.dsl.Disruptor;
- import com.lmax.disruptor.dsl.ProducerType;
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * @author zy
- */
- public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
-
- /**
- * 记录所有的队列,系统退出时统一清理资源
- */
- private List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
- /**
- * Disruptor 对象
- */
- private Disruptor<E> disruptor;
- /**
- * RingBuffer
- */
- private RingBuffer<E> ringBuffer;
- /**
- * initQueue
- */
- private List<D> initQueue = new ArrayList();
-
- /**
- * 事件工厂
- *
- * @return EventFactory
- */
- protected EventFactory<E> eventFactory() {
- return null;
- }
-
- /**
- * 队列大小
- *
- * @return 队列长度,必须是2的幂
- */
- protected int getQueueSize() {
- return 0;
- }
-
- /**
- * 初始化
- */
- public void init() {
- this.disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), new NamedThreadFactory(DmConstants.DIS_THREAD_NAME_FORMAT), ProducerType.SINGLE, getStrategy());
- this.disruptor.setDefaultExceptionHandler(new DisruptorHandlerException());
- this.disruptor.handleEventsWithWorkerPool(new WorkHandler[]{new DispatcherEventHandler()});
- this.ringBuffer = disruptor.start();
-
- //初始化数据发布
- for (D data : initQueue) {
- ringBuffer.publishEvent((event, sequence, data1) -> event.setValue(data1), data);
- }
-
- //加入资源清理钩子
- synchronized (queueHelperList) {
- if (queueHelperList.isEmpty()) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- for (BaseQueueHelper baseQueueHelper : queueHelperList) {
- baseQueueHelper.shutdown();
- }
- }
- });
- }
- queueHelperList.add(this);
- }
- }
-
- /**
- * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
- * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
- *
- * @return WaitStrategy
- */
- protected abstract WaitStrategy getStrategy();
-
- /**
- * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
- */
- public synchronized void publishEvent(D data) {
- if (ringBuffer == null) {
- initQueue.add(data);
- return;
- }
- ringBuffer.publishEvent((event, sequence, data1) -> event.setValue(data1), data);
- }
-
- /**
- * 关闭队列
- */
- public void shutdown() {
- disruptor.shutdown();
- }
- }
事件到来后,将事件推送到disruptor
- @Override
- public void commit() throws SQLException {
- targetConnection.commit();
- DmLocalContext cur = DmLocalContext.cur();
- if (cur != null) {
- SeriesDataEventQueueHelper.Instance().publishEvent(new SeriesData(cur.getTasks()));
- }
- }
事件处理器,消费获取的事件,然后由pipeline对任务进行处理:
- public class DispatcherEventHandler implements WorkHandler<SeriesDataEvent> {
-
- private Logger logger = LoggerFactory.getLogger(DispatcherEventHandler.class);
-
- @Override
- public void onEvent(SeriesDataEvent event) {
- List<SqlParserTask> tasks = event.getValue().getTasks();
- if (event.getValue() == null || StringUtils.isEmpty(tasks)) {
- logger.warn("Receiver series data is empty!");
- }
- logger.info("Receiver series data is :" + Arrays.toString(tasks.toArray()));
- Pipeline pipeline = PipelineContext.getPipeline();
- tasks.stream().forEach(e -> {
- if (e != null) {
- pipeline.process(e);
- }
- });
- }
-
- /* public int hash(int parallelSize, long id){
- int iid = Integer.parseInt(String.valueOf(id));
- int n = parallelSize - 1;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n = (n < 0) ? 1 : (n >= DmConstants.MAXIMUM_CAPACITY) ? DmConstants.MAXIMUM_CAPACITY : n + 1;
- return (n - 1 ) & iid;
- }*/
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。