赞
踩
1.简介
参照http://ifeve.com/disruptor/文档
2.helloworld
- //自己定义的
- public class LongEvent {
-
- private long value;
-
- public long getValue() {
- return value;
- }
-
- public void setValue(long value) {
- this.value = value;
- }
- }
-
- //工厂生产自己的那个类
- public class LongEventFactory implements EventFactory {
- @Override
- public Object newInstance() {
- return new LongEvent();
- }
- }
-
- //消费,用于处理我们的任务
- //我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:
- public class LongEventHandler implements EventHandler<LongEvent> {
-
- @Override
- public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
- System.out.println(longEvent.getValue());
- }
-
- }
-
-
-
- //生产任务
- public class LongEventProducer {
-
- private final RingBuffer<LongEvent> ringBuffer;
-
- public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
- this.ringBuffer = ringBuffer;
- }
-
- /**
- * onData用来发布事件,每调用一次就发布一次事件
- * 它的参数会用过事件传递给消费者
- */
- public void onData(ByteBuffer bb){
- //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
- long sequence = ringBuffer.next();
- try {
- //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
- LongEvent event = ringBuffer.get(sequence);
- //3.获取要通过事件传递的业务数据
- event.setValue(bb.getLong(0));
- } finally {
- //4.发布事件
- //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
- ringBuffer.publish(sequence);
- }
- }
-
-
-
-
-
- }
-
-
-
- //主函数
- public class LongEventMain {
- public static void main(String[] args) throws Exception {
- //创建线程池
- ExecutorService executor = Executors.newCachedThreadPool();
- //创建工厂
- LongEventFactory factory = new LongEventFactory();
- //创建buffersize,也是就是ringbuffer大小,必须是2的n次方
- int ringBufferSize=1024*1024;
-
- //创建disruptor
- //第一个参数为工厂类对象,用于创建一个个的longevent,longevent是实际的消费数据
- //2.第二个参数为缓冲区大小
- //3.第三个参数为线程池,进行disruptor 内部的数据接收处理调度
- //4.第四个参数ProducerType.single和ProducerType.MULTI(一个生产者用single多个用multi)
- //5.第5个参数 是一种策略YieldingWaitStrategy,生产和消费的策略
- /**
- //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
- WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
- //SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
- WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
- //YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
- WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
- */
- Disruptor<LongEvent> disruptor =
- new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
-
- //连接消费事件方法
- disruptor.handleEventsWith(new LongEventHandler());
-
- //启动
- disruptor.start();
-
- //Disruptor 的事件发布过程是一个两阶段提交的过程:
- //发布事件
- RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
-
- LongEventProducer producer = new LongEventProducer(ringBuffer);
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
-
- for (int i = 0; i < 100; i++) {
-
- byteBuffer.putLong(0,i);
- producer.onData(byteBuffer);
- }
- disruptor.shutdown();
- executor.shutdown();
-
- }
-
- }
3.术语说明
4.理解ringBuffer
多个生产者消费者例子:
- public class Order {
-
- private String id;
- private String name;
- private double price;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public double getPrice() {
- return price;
- }
-
- public void setPrice(double price) {
- this.price = price;
- }
- }
-
-
- public class Producer {
-
- private final RingBuffer<Order> ringBuffer;
-
- public Producer(RingBuffer<Order> ringBuffer){
- this.ringBuffer = ringBuffer;
- }
-
- /**
- * onData用来发布事件,每调用一次就发布一次事件
- * 它的参数会用过事件传递给消费者
- */
- public void onData(String data){
- //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
- long sequence = ringBuffer.next();
- try {
- //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
- Order order = ringBuffer.get(sequence);
- //获取要通过事件传递的业务数据
- order.setId(data);
- } finally {
- //发布事件
- //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
- ringBuffer.publish(sequence);
- }
- }
-
-
- }
-
-
- public class Consumer implements WorkHandler<Order>{
-
- private String consumerId;
-
- private static AtomicInteger count = new AtomicInteger(0);
-
- public Consumer(String consumerId){
- this.consumerId = consumerId;
- }
-
- @Override
- public void onEvent(Order order) throws Exception {
- System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId());
- count.incrementAndGet();
- }
-
- public int getCount(){
- return count.get();
- }
-
- }
-
- public class Main {
-
- public static void main(String[] args) throws Exception {
-
- //创建ringBuffer
- RingBuffer<Order> ringBuffer =
- RingBuffer.create(ProducerType.MULTI,
- new EventFactory<Order>() {
- @Override
- public Order newInstance() {
- return new Order();
- }
- },
- 1024 * 1024,
- new YieldingWaitStrategy());
-
- SequenceBarrier barriers = ringBuffer.newBarrier();
-
- Consumer[] consumers = new Consumer[3];
- for(int i = 0; i < consumers.length; i++){
- consumers[i] = new Consumer("c" + i);
- }
-
- WorkerPool<Order> workerPool =
- new WorkerPool<Order>(ringBuffer,
- barriers,
- new IntEventExceptionHandler(),
- consumers);
-
- ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
- workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
-
- final CountDownLatch latch = new CountDownLatch(1);
- for (int i = 0; i < 100; i++) {
- final Producer p = new Producer(ringBuffer);
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- for(int j = 0; j < 100; j ++){
- p.onData(UUID.randomUUID().toString());
- }
- }
- }).start();
- }
- Thread.sleep(2000);
- System.out.println("---------------开始生产-----------------");
- latch.countDown();
- Thread.sleep(5000);
- System.out.println("总数:" + consumers[0].getCount() );
- }
-
- static class IntEventExceptionHandler implements ExceptionHandler {
- public void handleEventException(Throwable ex, long sequence, Object event) {}
- public void handleOnStartException(Throwable ex) {}
- public void handleOnShutdownException(Throwable ex) {}
- }
- }
-
-
使用场景
其他请见我的下载里面的disruptor的例子,和网站
http://lmax-exchange.github.io/disruptor/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。