当前位置:   article > 正文

Disruptor入门_new disruptor

new disruptor
         Disruptor是一个高性能的异步处理框架,它能在一个线程里每秒处理6百万订单,业务逻辑完全是运行在内存中,使用事件源驱动方式,业务逻辑处理器的核心是Disruptor。
         创建disruptor:
            new Disruptor<消费生产的泛型>(
                EventFactory<> eventFactory 生产要消费对象的工厂,
                int ringBufferSize  ringBuffer的大小,最好是2的N次方,
                Executor executor 创建的线程池 进行Disruptor 内部的数据接收处理调度
                ProducerType producerType 有SINGLE和MULTI两个参数,SINGGLE:生产者只有一个,MULTI:生产者有多个。
                WaitStrategy waitStrategy 这是一种策略。API提供了三种策略。默认使用第三种策略
                    //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
                    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
                    //SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
                    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
                    //YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
                     WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
                )
        创建事件工厂( 生产数据):
            需要实现EventFactory,实现newInstance()方法,返回需要创建的实例对象。
        事件消费者(消费者):
            需要时间EventHandler<泛型>( ),实现onEvent()方法,具体消费的业务逻辑。   
        事件都会有一个生成事件的源(有点发布订阅的意思),
            long sequence = ringBuffer.next(); ringBuffer是一个环形的队列,这个方法的意思就是获取下一个指向环形队列的索引。如果获取了这个队列的指针,就必须要发布事件,否则会造成Disruptior状态的混乱。尤其是在多个事件生产者的情况下导致事件消费者失速,而不得不重启应用来解决。
            RingBuffer<泛型> ringBuffer = ringBuffer.get(sequence);这个泛型,是在new Disruptor<>()这里定义的。获取的是对象实例。可以调用对象的方法进行赋值。
            ringBuffer.publish(sequence)。这个是发布这个事件,这个需要保证必须调用,可以把它放在finally中。如果有一个请求的sequence没有被发布,将会堵塞后面的发布事件或者其它的生成事件的源。
Disruptor的代码流程:
     1.创建Disruptor对象
        Disruptor<LongEvent> disruptor new Disruptor<LongEvent>( factory , ringBufferSize , executor , ProducerType. SINGLE , new YieldingWaitStrategy());     
     2.连接消费者(需要实现EventHandler接口,实现onEvent方法)
        disruptor .handleEventsWith( new LongEventHandler());
     3.启动消费者
       disruptor .start();
     4.获取ringBuffer
          RingBuffer<LongEvent> ringBuffer = disruptor .getRingBuffer();
     5.创建生产者,把ringbuffer传入
         LongEventProducer producer = new LongEventProducer( ringBuffer );
     6.调用生产者的方法进行生产数据
     7.关闭disruptor,方法会被阻塞,直至所有的事件都得到处理
        disruptor .shutdown();
      8.关闭线程池,disruport在关闭的时候不会关闭线程池
           executor .shutdown();            
生产者的代码流程:
    1.需要RingBuffer这个对象
           private final RingBuffer<LongEvent> ringBuffer ;
           public LongEventProducer (RingBuffer<LongEvent> ringBuffer ){
             this . ringBuffer = ringBuffer ; }
    2.创建一个生产数据的方法
    3.获取ringbuffer事件队列的,下一个事件槽(其实就是一个指针)
            long sequence = ringBuffer .next();
    4.根据这个指针获取到当前这个指针下的空对象,因为创建disruptor泛型是这个对象,所以这里返回的是longEvent。
             LongEvent event = ringBuffer .get( sequence );
    5.根据获取的对象进行生产者的业务逻辑。
    6.发布事件, 这个需要保证必须调用,可以把它放在finally中。如果有一个请求的sequence没有被发布,将会堵塞后面的发布事件或者其它的生产者
             finally {   ringBuffer . publish ( sequence ); }


另一种简便的生产者的代码流程(Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在RingBuffer,所以在Disruptor 3.0 以后的版本最好使用 Event publisher 或者 Event Translator来发布事件):
    1.创建一个静态的,不可修改的EventTranslatorOneArg对象
            private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
                   new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                         @Override
                         public void translateTo(LongEvent event , long sequeue , ByteBuffer buffer ) {
                               这个是生产者处理的业务逻辑
                              第一个参数是创建disruptor声名的那个泛型的对象
                              第二个参数指向ringBuffer的指针
                              第三个参数调用这个方法传递过来的数据
                        }

                  };
    2.同上需要ringBuffer这个对象
    3.调用的方法,(外界调用这个方法进行生产)
            public void onData(ByteBuffer buffer ){
             ringBuffer .publishEvent( TRANSLATOR , buffer );
          }
Disruptor术语:
  1. RingBuffer: 被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。
  2. Sequence: Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。
  3. Sequencer: 这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。
  4. SequenceBarrier: 由Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。
  5. WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor。
  6. Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。
  7. EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
  8. EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。
  9. Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。
  10. WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。
  11. WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker吃间移交
  12. LifecycleAware:当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。
RingBuffer是什么?
    RingBuffer其实就是一个环(首尾相接的环),可以把它理解为在不同上下文(线程)间传递数据的buffer。
    sequence就是一个指针,指向这个环中下一个可用的空间(long sequence = ringBuffer.next())
    ringBufferSize,最好的2的N次方,是因为要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array length = array index(取模操作),槽的个数是2的N次方更利于计算机进行二进制计算,这样可以提高获取指针的速度。
    选用ringBuffer的好处:
  1. 可以提供可靠的消息传递
  2. 不需要删除buffer中的数据,可以直接用新数据进行覆盖。
  3. 因为他是数组,所以比链表快。而且有一个容易预测的访问问题。
  4. 这样是对CPU缓存友好的,也就是说在硬件级别,数组中的元素是会被预加载的,因此在ringBuffer当中,cpu无需时不时的从主内存加载数组的下一个元素。
  5. 你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止),这就意味着不需要花费大量的时间用于垃圾回收。此外不像链表一样,需要为每一个添加到其上面的对象创建节点对象一一对应,删除节点时,还需要进行响应的内存清理操作。  
消费者生产者消费快慢的问题:
    生产者要是生产的快,消费者消费慢。生产者就要等待消费者消费完,在这个过程中生产者是阻塞状态。相反,同理。
    
    




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/255023
推荐阅读
相关标签
  

闽ICP备14008679号