当前位置:   article > 正文

springboot整合disruptor(线程工厂)_springboot disruptor

springboot disruptor

一、并发框架disruptor介绍

1、概念:同一个jvm进程中线程间异步通信的框架

2、环形数组RingBuffer:disruptor的核心存储容器

2.1、环形数组中的元素采用覆盖方式,避免了jvm的GC

2.2、数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,其实和hashmap的index运算一样,不一样的是hashmap会扩容,而这个RingBuffer不扩容而去覆盖原来的数据

3、SequenceBarrier:

是起屏障作用的类,因为在往RingBuffer放的过程中,生产者和消费者的存取速度不一致会造成错误。这时用SequenceBarrier可以来限制过快的存或者取,来达到速度的一致,保证不出错。原理是每次消费者取的时候会把取到的数据的位置返给生产者,生产者通过这个位置来判断什么时候往RingBuffer中放数据

4、工作流程:

生产者往RingBuffer中放数据,disruptor把数据推给消费者

5、工作模式:

统一消费、分组消费、顺序消费、多支线顺序消费

详细介绍: https://blog.csdn.net/zhouzhenyong/article/details/81303011

二、springboot整合disruptor

1、消息体

  1. package com.huwei.hotel.collector.contacter.interfaces.auth.event;
  2. import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.springframework.http.HttpStatus;
  6. /**
  7. * @author ljy
  8. * @date 2023/3/06
  9. **/
  10. @Data
  11. @NoArgsConstructor
  12. public class AuthEvent {
  13. private String authName;
  14. private MqttAuthTypeEnum authType;
  15. private String clientKey;
  16. private String failureReason;
  17. void clear() {
  18. authName = null;
  19. authType = null;
  20. clientKey = null;
  21. failureReason = null;
  22. }
  23. }

2、业务数据工厂

  1. package com.huwei.hotel.collector.contacter.interfaces.auth.event;
  2. import com.lmax.disruptor.EventFactory;
  3. /**
  4. * 工厂方法
  5. * @author ljy
  6. * @date 2023/3/06
  7. */
  8. public class AuthEventFactory implements EventFactory<AuthEvent> {
  9. @Override
  10. public AuthEvent newInstance() {
  11. return new AuthEvent();
  12. }
  13. }

3、生产者

  1. package com.huwei.hotel.collector.contacter.interfaces.auth.event;
  2. import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
  3. import com.lmax.disruptor.RingBuffer;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.Resource;
  7. /**
  8. * @author ljy
  9. * @date 2023/3/06
  10. **/
  11. @Slf4j
  12. @Component
  13. public class AuthEventProducer {
  14. @Resource(name = "authEventRingBuffer")
  15. RingBuffer<AuthEvent> ringBuffer;
  16. public void publish(String authName, MqttAuthTypeEnum authType, String clientKey, String failureReason) {
  17. ringBuffer.publishEvent((event, sequence) -> {
  18. event.setAuthName(authName);
  19. event.setAuthType(authType);
  20. event.setClientKey(clientKey);
  21. event.setFailureReason(failureReason);
  22. });
  23. }
  24. }

4、定义配置类RingBufferConfiguration

  1. package com.huwei.hotel.collector.contacter.infrastructure.config;
  2. import com.huwei.hotel.collector.contacter.application.event.MessageEvent;
  3. import com.huwei.hotel.collector.contacter.infrastructure.disruptor.DisruptorFactory;
  4. import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
  5. import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
  6. import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
  7. import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
  8. import com.lmax.disruptor.RingBuffer;
  9. import com.lmax.disruptor.dsl.Disruptor;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import javax.annotation.PreDestroy;
  13. /**
  14. * @author ljy
  15. * @date 2023/3/06
  16. **/
  17. @Configuration
  18. public class RingBufferConfiguration {
  19. final DisruptorFactory disruptorFactory = new DisruptorFactory();
  20. final Disruptor<MessageEvent> messageEventDisruptor;
  21. final Disruptor<DebugEvent> debugEventDisruptor;
  22. final Disruptor<AuthEvent> authEventDisruptor;
  23. final DebugHandler debugHandler;
  24. final AuthHandler authHandler;
  25. RingBufferConfiguration(DebugHandler debugHandler, AuthHandler authHandler){
  26. this.debugHandler = debugHandler;
  27. this.authHandler = authHandler;
  28. messageEventDisruptor = disruptorFactory.createDisruptor();
  29. debugEventDisruptor = disruptorFactory.createDisruptor(this.debugHandler);
  30. authEventDisruptor = disruptorFactory.createDisruptor(this.authHandler);
  31. this.start();
  32. }
  33. private void start() {
  34. if(messageEventDisruptor != null){
  35. messageEventDisruptor.start();
  36. }
  37. if(debugEventDisruptor != null){
  38. debugEventDisruptor.start();
  39. }
  40. if(authEventDisruptor != null){
  41. authEventDisruptor.start();
  42. }
  43. }
  44. @PreDestroy
  45. public void doDestory(){
  46. if(messageEventDisruptor != null){
  47. messageEventDisruptor.shutdown();
  48. }
  49. if(debugEventDisruptor != null){
  50. debugEventDisruptor.shutdown();
  51. }
  52. if(authEventDisruptor != null){
  53. authEventDisruptor.shutdown();
  54. }
  55. }
  56. @Bean(name = "messageEventRingBuffer")
  57. public RingBuffer<MessageEvent> messageEventRingBuffer() {
  58. return messageEventDisruptor.getRingBuffer();
  59. }
  60. @Bean(name = "debugEventRingBuffer")
  61. public RingBuffer<DebugEvent> debugEventRingBuffer() {
  62. return debugEventDisruptor.getRingBuffer();
  63. }
  64. @Bean(name = "authEventRingBuffer")
  65. public RingBuffer<AuthEvent> authEventRingBuffer() {
  66. return authEventDisruptor.getRingBuffer();
  67. }
  68. }

5、定义DisruptorFactory线程工厂

  1. package com.huwei.hotel.collector.contacter.infrastructure.disruptor;
  2. import com.huwei.hotel.collector.contacter.application.event.*;
  3. import com.huwei.hotel.collector.contacter.infrastructure.helper.ThreadPoolHelper;
  4. import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
  5. import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEventExceptionHandler;
  6. import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
  7. import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
  8. import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEventExceptionHandler;
  9. import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
  10. import com.lmax.disruptor.dsl.Disruptor;
  11. /**
  12. * @author ljy
  13. * @date 2023/3/06
  14. **/
  15. public class DisruptorFactory {
  16. /**
  17. * 多生产者模式, 默认等待策略为阻塞策略
  18. *
  19. * @return
  20. */
  21. public Disruptor<MessageEvent> createDisruptor() {
  22. int bufferSize = 1024 * 64;
  23. Disruptor<MessageEvent> disruptor =
  24. new Disruptor<>(MessageEvent::new, bufferSize, ThreadPoolHelper.threadFactory("MessageEvent"));
  25. ResponseHandler[] cackHandlers = new ResponseHandler[5];
  26. for (int i = 0; i < cackHandlers.length; i++) {
  27. cackHandlers[i] = new ResponseHandler();
  28. }
  29. DataHandler[] workHandlers = new DataHandler[5];
  30. for (int i = 0; i < workHandlers.length; i++) {
  31. workHandlers[i] = new DataHandler();
  32. }
  33. ClearHandler clearHandler = new ClearHandler();
  34. /* 设置事件业务处理器---消费者 介绍几种类型
  35. //定义消费者执行模式(在这里一个消费者也就是一个线程,消费者执行模式也就是线程的执行模式)
  36. // disruptor.handleEventsWith(msg1, msg2, msg3, msg4); //统一消费:一个消息会被所有消费者消费
  37. // disruptor.handleEventsWithWorkerPool(msg1, msg2); //分组消费:一个消息只能被一个消费者消费,多消费者轮询处理
  38. // disruptor.handleEventsWith(msg1, msg3).then(msg2); //顺序消费:1、3先并行处理,然后2再处理
  39. // disruptor.handleEventsWith(msg1, msg3); //多支线顺序消费:消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费
  40. // disruptor.handleEventsWith(msg2, msg4);
  41. // disruptor.after(msg3, msg4).handleEventsWith(msg5);
  42. */
  43. disruptor.handleEventsWithWorkerPool(cackHandlers)
  44. .thenHandleEventsWithWorkerPool(workHandlers).then(clearHandler);
  45. disruptor.setDefaultExceptionHandler(MessageEventExceptionHandler.INSTANCE);
  46. return disruptor;
  47. }
  48. /**
  49. * 多生产者模式, 默认等待策略为阻塞策略
  50. *
  51. * @return
  52. */
  53. public Disruptor<DebugEvent> createDisruptor(DebugHandler debugHandler) {
  54. int bufferSize = 1024 * 64;
  55. Disruptor<DebugEvent> disruptor =
  56. new Disruptor<>(DebugEvent::new, bufferSize, ThreadPoolHelper.threadFactory("DebugEvent"));
  57. disruptor.handleEventsWith(debugHandler);
  58. disruptor.setDefaultExceptionHandler(DebugEventExceptionHandler.INSTANCE);
  59. return disruptor;
  60. }
  61. /**
  62. * 多生产者模式, 默认等待策略为阻塞策略
  63. *
  64. * @return
  65. */
  66. public Disruptor<AuthEvent> createDisruptor(AuthHandler authHandler) {
  67. int bufferSize = 1024 * 64;
  68. Disruptor<AuthEvent> disruptor =
  69. new Disruptor<>(AuthEvent::new, bufferSize, ThreadPoolHelper.threadFactory("AuthEvent"));
  70. disruptor.handleEventsWith(authHandler);
  71. disruptor.setDefaultExceptionHandler(AuthEventExceptionHandler.INSTANCE);
  72. return disruptor;
  73. }
  74. }

6、DisruptorFactory线程工厂工具类

  1. package com.huwei.hotel.collector.contacter.infrastructure.helper;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.lang3.concurrent.BasicThreadFactory;
  4. import java.util.concurrent.ArrayBlockingQueue;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.ThreadFactory;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * 线程池
  11. *
  12. * @author ljy
  13. * @date 2023/3/06
  14. **/
  15. @Slf4j
  16. public class ThreadPoolHelper {
  17. /**
  18. * 线程工厂
  19. *
  20. * @param preName
  21. * @return
  22. */
  23. public static ThreadFactory threadFactory(String preName) {
  24. BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
  25. .namingPattern(preName + "-Disruptor-%d")
  26. .daemon(true)
  27. .priority(Thread.NORM_PRIORITY)
  28. .uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  29. @Override
  30. public void uncaughtException(Thread t, Throwable e) {
  31. log.error(String.format("创建线程(%s)异常", t.getName()), e);
  32. }
  33. })
  34. .build();
  35. return threadFactory;
  36. }
  37. /**
  38. * 线程池
  39. *
  40. * @param nThreads
  41. * @param preName
  42. * @return
  43. */
  44. public static ExecutorService executorService(int nThreads, String preName) {
  45. ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads,
  46. nThreads,
  47. 60L,
  48. TimeUnit.SECONDS,
  49. new ArrayBlockingQueue<>(nThreads),
  50. threadFactory(preName));
  51. return executor;
  52. }
  53. }

7、消费者

  1. package com.huwei.hotel.collector.contacter.interfaces.auth.event;
  2. import com.huwei.hotel.collector.contacter.interfaces.userlog.service.UserLogService;
  3. import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
  4. import com.lmax.disruptor.EventHandler;
  5. import com.lmax.disruptor.WorkHandler;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * 有待改进, 可以按照Disruptor的批量读取处理
  11. *
  12. * @author ljy
  13. * @date 2023/3/06 14:24
  14. **/
  15. @Slf4j
  16. @Component
  17. public class AuthHandler implements WorkHandler<AuthEvent>, EventHandler<AuthEvent> {
  18. @Autowired
  19. UserLogService userLogService;
  20. @Override
  21. public void onEvent(AuthEvent event) throws Exception {
  22. try {
  23. String authName = event.getAuthName();
  24. MqttAuthTypeEnum authType = event.getAuthType();
  25. String clientKey = event.getClientKey();
  26. String failureReason = event.getFailureReason();
  27. //记录认证结果
  28. userLogService.save(authName, authType, clientKey, failureReason);
  29. } finally {
  30. event.clear();
  31. }
  32. }
  33. @Override
  34. public void onEvent(AuthEvent event, long sequence, boolean endOfBatch) throws Exception {
  35. this.onEvent(event); // TODO: 可以进行批量处理
  36. }
  37. }

8、调用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/255018
推荐阅读
相关标签
  

闽ICP备14008679号