当前位置:   article > 正文

10 disruptor并发框架_disruptor producertype.multi

disruptor producertype.multi

1.简介

 

 

参照http://ifeve.com/disruptor/文档 

2.helloworld

  1. //自己定义的
  2. public class LongEvent {
  3. private long value;
  4. public long getValue() {
  5. return value;
  6. }
  7. public void setValue(long value) {
  8. this.value = value;
  9. }
  10. }
  11. //工厂生产自己的那个类
  12. public class LongEventFactory implements EventFactory {
  13. @Override
  14. public Object newInstance() {
  15. return new LongEvent();
  16. }
  17. }
  18. //消费,用于处理我们的任务
  19. //我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端:
  20. public class LongEventHandler implements EventHandler<LongEvent> {
  21. @Override
  22. public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
  23. System.out.println(longEvent.getValue());
  24. }
  25. }
  26. //生产任务
  27. public class LongEventProducer {
  28. private final RingBuffer<LongEvent> ringBuffer;
  29. public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
  30. this.ringBuffer = ringBuffer;
  31. }
  32. /**
  33. * onData用来发布事件,每调用一次就发布一次事件
  34. * 它的参数会用过事件传递给消费者
  35. */
  36. public void onData(ByteBuffer bb){
  37. //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
  38. long sequence = ringBuffer.next();
  39. try {
  40. //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
  41. LongEvent event = ringBuffer.get(sequence);
  42. //3.获取要通过事件传递的业务数据
  43. event.setValue(bb.getLong(0));
  44. } finally {
  45. //4.发布事件
  46. //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
  47. ringBuffer.publish(sequence);
  48. }
  49. }
  50. }
  51. //主函数
  52. public class LongEventMain {
  53. public static void main(String[] args) throws Exception {
  54. //创建线程池
  55. ExecutorService executor = Executors.newCachedThreadPool();
  56. //创建工厂
  57. LongEventFactory factory = new LongEventFactory();
  58. //创建buffersize,也是就是ringbuffer大小,必须是2的n次方
  59. int ringBufferSize=1024*1024;
  60. //创建disruptor
  61. //第一个参数为工厂类对象,用于创建一个个的longevent,longevent是实际的消费数据
  62. //2.第二个参数为缓冲区大小
  63. //3.第三个参数为线程池,进行disruptor 内部的数据接收处理调度
  64. //4.第四个参数ProducerType.single和ProducerType.MULTI(一个生产者用single多个用multi)
  65. //5.第5个参数 是一种策略YieldingWaitStrategy,生产和消费的策略
  66. /**
  67. //BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
  68. WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
  69. //SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景
  70. WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
  71. //YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性
  72. WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
  73. */
  74. Disruptor<LongEvent> disruptor =
  75. new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
  76. //连接消费事件方法
  77. disruptor.handleEventsWith(new LongEventHandler());
  78. //启动
  79. disruptor.start();
  80. //Disruptor 的事件发布过程是一个两阶段提交的过程:
  81. //发布事件
  82. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
  83. LongEventProducer producer = new LongEventProducer(ringBuffer);
  84. ByteBuffer byteBuffer = ByteBuffer.allocate(8);
  85. for (int i = 0; i < 100; i++) {
  86. byteBuffer.putLong(0,i);
  87. producer.onData(byteBuffer);
  88. }
  89. disruptor.shutdown();
  90. executor.shutdown();
  91. }
  92. }

3.术语说明 

 

 

4.理解ringBuffer

 

 

多个生产者消费者例子:

 

  1. public class Order {
  2. private String id;
  3. private String name;
  4. private double price;
  5. public String getId() {
  6. return id;
  7. }
  8. public void setId(String id) {
  9. this.id = id;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public void setName(String name) {
  15. this.name = name;
  16. }
  17. public double getPrice() {
  18. return price;
  19. }
  20. public void setPrice(double price) {
  21. this.price = price;
  22. }
  23. }
  24. public class Producer {
  25. private final RingBuffer<Order> ringBuffer;
  26. public Producer(RingBuffer<Order> ringBuffer){
  27. this.ringBuffer = ringBuffer;
  28. }
  29. /**
  30. * onData用来发布事件,每调用一次就发布一次事件
  31. * 它的参数会用过事件传递给消费者
  32. */
  33. public void onData(String data){
  34. //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
  35. long sequence = ringBuffer.next();
  36. try {
  37. //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
  38. Order order = ringBuffer.get(sequence);
  39. //获取要通过事件传递的业务数据
  40. order.setId(data);
  41. } finally {
  42. //发布事件
  43. //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
  44. ringBuffer.publish(sequence);
  45. }
  46. }
  47. }
  48. public class Consumer implements WorkHandler<Order>{
  49. private String consumerId;
  50. private static AtomicInteger count = new AtomicInteger(0);
  51. public Consumer(String consumerId){
  52. this.consumerId = consumerId;
  53. }
  54. @Override
  55. public void onEvent(Order order) throws Exception {
  56. System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId());
  57. count.incrementAndGet();
  58. }
  59. public int getCount(){
  60. return count.get();
  61. }
  62. }
  63. public class Main {
  64. public static void main(String[] args) throws Exception {
  65. //创建ringBuffer
  66. RingBuffer<Order> ringBuffer =
  67. RingBuffer.create(ProducerType.MULTI,
  68. new EventFactory<Order>() {
  69. @Override
  70. public Order newInstance() {
  71. return new Order();
  72. }
  73. },
  74. 1024 * 1024,
  75. new YieldingWaitStrategy());
  76. SequenceBarrier barriers = ringBuffer.newBarrier();
  77. Consumer[] consumers = new Consumer[3];
  78. for(int i = 0; i < consumers.length; i++){
  79. consumers[i] = new Consumer("c" + i);
  80. }
  81. WorkerPool<Order> workerPool =
  82. new WorkerPool<Order>(ringBuffer,
  83. barriers,
  84. new IntEventExceptionHandler(),
  85. consumers);
  86. ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  87. workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
  88. final CountDownLatch latch = new CountDownLatch(1);
  89. for (int i = 0; i < 100; i++) {
  90. final Producer p = new Producer(ringBuffer);
  91. new Thread(new Runnable() {
  92. @Override
  93. public void run() {
  94. try {
  95. latch.await();
  96. } catch (InterruptedException e) {
  97. e.printStackTrace();
  98. }
  99. for(int j = 0; j < 100; j ++){
  100. p.onData(UUID.randomUUID().toString());
  101. }
  102. }
  103. }).start();
  104. }
  105. Thread.sleep(2000);
  106. System.out.println("---------------开始生产-----------------");
  107. latch.countDown();
  108. Thread.sleep(5000);
  109. System.out.println("总数:" + consumers[0].getCount() );
  110. }
  111. static class IntEventExceptionHandler implements ExceptionHandler {
  112. public void handleEventException(Throwable ex, long sequence, Object event) {}
  113. public void handleOnStartException(Throwable ex) {}
  114. public void handleOnShutdownException(Throwable ex) {}
  115. }
  116. }

 使用场景

其他请见我的下载里面的disruptor的例子,和网站

http://ifeve.com/disruptor/

http://lmax-exchange.github.io/disruptor/

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号