赞
踩
一、并发框架disruptor介绍
1、概念:同一个jvm进程中线程间异步通信的框架
2、环形数组RingBuffer:disruptor的核心存储容器
2.1、环形数组中的元素采用覆盖方式,避免了jvm的GC
2.2、数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,其实和hashmap的index运算一样,不一样的是hashmap会扩容,而这个RingBuffer不扩容而去覆盖原来的数据
3、SequenceBarrier:
是起屏障作用的类,因为在往RingBuffer放的过程中,生产者和消费者的存取速度不一致会造成错误。这时用SequenceBarrier可以来限制过快的存或者取,来达到速度的一致,保证不出错。原理是每次消费者取的时候会把取到的数据的位置返给生产者,生产者通过这个位置来判断什么时候往RingBuffer中放数据
4、工作流程:
生产者往RingBuffer中放数据,disruptor把数据推给消费者
5、工作模式:
统一消费、分组消费、顺序消费、多支线顺序消费
详细介绍: https://blog.csdn.net/zhouzhenyong/article/details/81303011
- package com.huwei.hotel.collector.contacter.interfaces.auth.event;
-
-
- import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import org.springframework.http.HttpStatus;
-
- /**
- * @author ljy
- * @date 2023/3/06
- **/
- @Data
- @NoArgsConstructor
- public class AuthEvent {
- private String authName;
- private MqttAuthTypeEnum authType;
- private String clientKey;
- private String failureReason;
-
- void clear() {
- authName = null;
- authType = null;
- clientKey = null;
- failureReason = null;
- }
- }
- package com.huwei.hotel.collector.contacter.interfaces.auth.event;
-
- import com.lmax.disruptor.EventFactory;
-
- /**
- * 工厂方法
- * @author ljy
- * @date 2023/3/06
- */
- public class AuthEventFactory implements EventFactory<AuthEvent> {
-
- @Override
- public AuthEvent newInstance() {
- return new AuthEvent();
- }
-
- }
- package com.huwei.hotel.collector.contacter.interfaces.auth.event;
-
- import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
- import com.lmax.disruptor.RingBuffer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- /**
- * @author ljy
- * @date 2023/3/06
- **/
- @Slf4j
- @Component
- public class AuthEventProducer {
- @Resource(name = "authEventRingBuffer")
- RingBuffer<AuthEvent> ringBuffer;
-
- public void publish(String authName, MqttAuthTypeEnum authType, String clientKey, String failureReason) {
- ringBuffer.publishEvent((event, sequence) -> {
- event.setAuthName(authName);
- event.setAuthType(authType);
- event.setClientKey(clientKey);
- event.setFailureReason(failureReason);
- });
- }
- }
- package com.huwei.hotel.collector.contacter.infrastructure.config;
-
- import com.huwei.hotel.collector.contacter.application.event.MessageEvent;
- import com.huwei.hotel.collector.contacter.infrastructure.disruptor.DisruptorFactory;
- import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
- import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
- import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
- import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
- import com.lmax.disruptor.RingBuffer;
- import com.lmax.disruptor.dsl.Disruptor;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PreDestroy;
-
- /**
- * @author ljy
- * @date 2023/3/06
- **/
- @Configuration
- public class RingBufferConfiguration {
- final DisruptorFactory disruptorFactory = new DisruptorFactory();
- final Disruptor<MessageEvent> messageEventDisruptor;
- final Disruptor<DebugEvent> debugEventDisruptor;
- final Disruptor<AuthEvent> authEventDisruptor;
- final DebugHandler debugHandler;
- final AuthHandler authHandler;
-
- RingBufferConfiguration(DebugHandler debugHandler, AuthHandler authHandler){
- this.debugHandler = debugHandler;
- this.authHandler = authHandler;
- messageEventDisruptor = disruptorFactory.createDisruptor();
- debugEventDisruptor = disruptorFactory.createDisruptor(this.debugHandler);
- authEventDisruptor = disruptorFactory.createDisruptor(this.authHandler);
- this.start();
- }
-
- private void start() {
- if(messageEventDisruptor != null){
- messageEventDisruptor.start();
- }
-
- if(debugEventDisruptor != null){
- debugEventDisruptor.start();
- }
-
- if(authEventDisruptor != null){
- authEventDisruptor.start();
- }
- }
-
- @PreDestroy
- public void doDestory(){
- if(messageEventDisruptor != null){
- messageEventDisruptor.shutdown();
- }
-
- if(debugEventDisruptor != null){
- debugEventDisruptor.shutdown();
- }
-
- if(authEventDisruptor != null){
- authEventDisruptor.shutdown();
- }
- }
-
- @Bean(name = "messageEventRingBuffer")
- public RingBuffer<MessageEvent> messageEventRingBuffer() {
- return messageEventDisruptor.getRingBuffer();
- }
-
- @Bean(name = "debugEventRingBuffer")
- public RingBuffer<DebugEvent> debugEventRingBuffer() {
- return debugEventDisruptor.getRingBuffer();
- }
-
- @Bean(name = "authEventRingBuffer")
- public RingBuffer<AuthEvent> authEventRingBuffer() {
- return authEventDisruptor.getRingBuffer();
- }
- }
- package com.huwei.hotel.collector.contacter.infrastructure.disruptor;
-
- import com.huwei.hotel.collector.contacter.application.event.*;
- import com.huwei.hotel.collector.contacter.infrastructure.helper.ThreadPoolHelper;
- import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
- import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEventExceptionHandler;
- import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
- import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
- import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEventExceptionHandler;
- import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
- import com.lmax.disruptor.dsl.Disruptor;
-
- /**
- * @author ljy
- * @date 2023/3/06
- **/
- public class DisruptorFactory {
- /**
- * 多生产者模式, 默认等待策略为阻塞策略
- *
- * @return
- */
- public Disruptor<MessageEvent> createDisruptor() {
- int bufferSize = 1024 * 64;
-
- Disruptor<MessageEvent> disruptor =
- new Disruptor<>(MessageEvent::new, bufferSize, ThreadPoolHelper.threadFactory("MessageEvent"));
-
- ResponseHandler[] cackHandlers = new ResponseHandler[5];
- for (int i = 0; i < cackHandlers.length; i++) {
- cackHandlers[i] = new ResponseHandler();
- }
-
- DataHandler[] workHandlers = new DataHandler[5];
- for (int i = 0; i < workHandlers.length; i++) {
- workHandlers[i] = new DataHandler();
- }
-
- ClearHandler clearHandler = new ClearHandler();
- /* 设置事件业务处理器---消费者 介绍几种类型
- //定义消费者执行模式(在这里一个消费者也就是一个线程,消费者执行模式也就是线程的执行模式)
- // disruptor.handleEventsWith(msg1, msg2, msg3, msg4); //统一消费:一个消息会被所有消费者消费
- // disruptor.handleEventsWithWorkerPool(msg1, msg2); //分组消费:一个消息只能被一个消费者消费,多消费者轮询处理
- // disruptor.handleEventsWith(msg1, msg3).then(msg2); //顺序消费:1、3先并行处理,然后2再处理
- // disruptor.handleEventsWith(msg1, msg3); //多支线顺序消费:消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费
- // disruptor.handleEventsWith(msg2, msg4);
- // disruptor.after(msg3, msg4).handleEventsWith(msg5);
- */
- disruptor.handleEventsWithWorkerPool(cackHandlers)
- .thenHandleEventsWithWorkerPool(workHandlers).then(clearHandler);
- disruptor.setDefaultExceptionHandler(MessageEventExceptionHandler.INSTANCE);
- return disruptor;
- }
-
- /**
- * 多生产者模式, 默认等待策略为阻塞策略
- *
- * @return
- */
- public Disruptor<DebugEvent> createDisruptor(DebugHandler debugHandler) {
- int bufferSize = 1024 * 64;
-
- Disruptor<DebugEvent> disruptor =
- new Disruptor<>(DebugEvent::new, bufferSize, ThreadPoolHelper.threadFactory("DebugEvent"));
-
- disruptor.handleEventsWith(debugHandler);
- disruptor.setDefaultExceptionHandler(DebugEventExceptionHandler.INSTANCE);
- return disruptor;
- }
-
- /**
- * 多生产者模式, 默认等待策略为阻塞策略
- *
- * @return
- */
- public Disruptor<AuthEvent> createDisruptor(AuthHandler authHandler) {
- int bufferSize = 1024 * 64;
-
- Disruptor<AuthEvent> disruptor =
- new Disruptor<>(AuthEvent::new, bufferSize, ThreadPoolHelper.threadFactory("AuthEvent"));
-
- disruptor.handleEventsWith(authHandler);
- disruptor.setDefaultExceptionHandler(AuthEventExceptionHandler.INSTANCE);
- return disruptor;
- }
- }
- package com.huwei.hotel.collector.contacter.infrastructure.helper;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- /**
- * 线程池
- *
- * @author ljy
- * @date 2023/3/06
- **/
- @Slf4j
- public class ThreadPoolHelper {
- /**
- * 线程工厂
- *
- * @param preName
- * @return
- */
- public static ThreadFactory threadFactory(String preName) {
- BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
- .namingPattern(preName + "-Disruptor-%d")
- .daemon(true)
- .priority(Thread.NORM_PRIORITY)
- .uncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error(String.format("创建线程(%s)异常", t.getName()), e);
- }
- })
- .build();
- return threadFactory;
- }
-
- /**
- * 线程池
- *
- * @param nThreads
- * @param preName
- * @return
- */
- public static ExecutorService executorService(int nThreads, String preName) {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads,
- nThreads,
- 60L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(nThreads),
- threadFactory(preName));
- return executor;
- }
- }
- package com.huwei.hotel.collector.contacter.interfaces.auth.event;
-
- import com.huwei.hotel.collector.contacter.interfaces.userlog.service.UserLogService;
- import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
- import com.lmax.disruptor.EventHandler;
- import com.lmax.disruptor.WorkHandler;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * 有待改进, 可以按照Disruptor的批量读取处理
- *
- * @author ljy
- * @date 2023/3/06 14:24
- **/
- @Slf4j
- @Component
- public class AuthHandler implements WorkHandler<AuthEvent>, EventHandler<AuthEvent> {
- @Autowired
- UserLogService userLogService;
-
- @Override
- public void onEvent(AuthEvent event) throws Exception {
- try {
- String authName = event.getAuthName();
- MqttAuthTypeEnum authType = event.getAuthType();
- String clientKey = event.getClientKey();
- String failureReason = event.getFailureReason();
-
- //记录认证结果
- userLogService.save(authName, authType, clientKey, failureReason);
- } finally {
- event.clear();
- }
- }
-
- @Override
- public void onEvent(AuthEvent event, long sequence, boolean endOfBatch) throws Exception {
- this.onEvent(event); // TODO: 可以进行批量处理
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。