赞
踩
- 1、JUC包下队列大部分使用的都是ReentrantLock锁方式来保证线程安全的。在高并发的情况下为了防止OOM,只能选择有界队列,这样就会导致一部分请求的丢失;
- 2、加锁方式的等待唤醒机制对内存的开销很大,而且存在死锁的隐患;
- 3、有界队列通常采用数组实现,而数组结构又会导致另一个问题:伪共享,进而导致性能问题;
使用数组可以避免垃圾回收,同时由于空间局部性原理,数组对于处理器的缓存机制更加友好。
Disruptor定义的数组长度都是2^n,所以使用的定位方式都是位运算。位运算都是使用二进制的形式实现的,而机器对于二进制的指令显然会更加友好,速度更快。
Disruptor对数组中的元素进行操作都是通过CAS进行获取的,这样就能大大减少加锁对性能带来的影响。
缓存填充是为了解决伪共享而设计出来的,它能让每一个缓存行只有一个元素,这样对元素的写入操作就不会影响其它元素的缓存了。
使用观察者模式,是为了防止消费者对任务池的不断重试,从而减少这个过程中对CPU性能的消耗。
常见且默认的等待策略。当这个队列满了,不执行覆盖而是阻塞等待。使用ReentrantLock + Condition实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景。
这是一个循环等待策略,会在循环中不断的等待数据。它会先进行自旋等待,如果等待不成功(没有CAS到数据的写入权限),就会使用Thread.yield()方法让出CPU,并最终使用LockSupport.partNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时,典型的应用场景就是异步日志。
这个策略用于低延时场合。消费者线程会不断的循环监测缓冲区的变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延迟有比较高的要求,可以考虑这种策略。
该策略采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用。在这个策略下CPU核数必须大于消费者线程数,推荐在线程绑定到固定的CPU的场景下使用。
需要引入的依赖:
<!-- 引入Disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
代码实现:
package com.muyichen.demo.disruptor.event;
import lombok.Data;
/**
* 消息载体(事件)
*/
@Data
public class OrderEvent {
private long value;
private String name;
}
package com.muyichen.demo.disruptor.producer; import com.lmax.disruptor.RingBuffer; import com.muyichen.demo.disruptor.event.OrderEvent; /** * 消息(事件)生产者 */ public class OrderEventProducer { /** * 事件环形队列 */ private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long value, String name) { // 获取事件队列的下一个槽 long sequence = ringBuffer.next(); try { // 获取消息(事件) OrderEvent orderEvent = ringBuffer.get(sequence); // 写入数据消息 orderEvent.setValue(value); orderEvent.setName(name); } catch (Exception e) { // 异常处理 e.printStackTrace(); } finally { System.out.println("生产者" + Thread.currentThread().getName() + "发送数据:value:" + value + ",name:" + name); // 发布事件 ringBuffer.publish(sequence); } } }
package com.muyichen.demo.disruptor.consumer; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; import com.muyichen.demo.disruptor.event.OrderEvent; /** * 消息(事件)消费者 */ public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> { @Override public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception { // TODO 消费逻辑 System.out.println("消费者" + Thread.currentThread().getName() + "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName()); } @Override public void onEvent(OrderEvent orderEvent) throws Exception { // TODO 消费逻辑 System.out.println("消费者" + Thread.currentThread().getName() + "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName()); } }
package com.muyichen.demo.disruptor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.muyichen.demo.disruptor.consumer.OrderEventHandler; import com.muyichen.demo.disruptor.event.OrderEvent; import com.muyichen.demo.disruptor.producer.OrderEventProducer; import java.util.concurrent.Executors; /** * 高性能队列测试 */ public class DisruptorDemo { public static void main(String[] args) { //创建Disruptor Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>( OrderEvent::new, // 等同new OrderEventFactory() 1024 * 1024, // 环形数组容量 Executors.defaultThreadFactory(), ProducerType.SINGLE, // 单生产者 (生产类型有两种:单生产者、多生产者) new YieldingWaitStrategy() // 等待策略 ); // 设置消费者用于处理RingBuffer的事件 disruptor.handleEventsWith(new OrderEventHandler()); // 设置多消费者,消息会被重复消费 // disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler()); // 设置多消费者,消费者要实现WorkHandler接口,这样能保证,消息只会被一个消费者消费 // disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler()); // 启动Disruptor disruptor.start(); // 构建环形队列 RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); // 创建生产者并绑定环形队列 OrderEventProducer producer = new OrderEventProducer(ringBuffer); // 发送消息 for (int i=0; i<100; i++) { producer.onData(i, "muyichen" + i); } disruptor.shutdown(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。