当前位置:   article > 正文

Disruptor的应用-->发送消息_disruptor 推送数据

disruptor 推送数据


写的比较粗糙。

需求背景

在某些场景下(物料入库成功或者物料装箱成功)需要给特定的人发送消息(可以是短信、邮件或者系统内的消息)。

需求分析

场景多个、消息类型多个
如在入库中可能存在【一段时间】内存在很多人操作,这里假设操作库房的人数有100人,他们可能1分钟之内都会操作入库操作。这意味着1分钟会有100次请求访问,100次消息发送请求。
如果场景增多(目前15个),这里认为最高峰,假设 1分钟内执行15个场景,每个场景 50人操作:这就有 大概 15*50=750条消息请求需要发送。

思考

每个场景设计的业务逻辑很复杂,IO操作频繁。所以我们需要思考如何能最小程度的进行发送消息,而不去大量占用 业务逻辑时间呢。
这里有人就会问,那发消息直接使用异步不就行了吗?但是需求还要求能够同步获取到 发送消息的结果。如果使用异步,我们选择那种异步框架?对异步线程如何处理?这都是需要思考的地方。

技术选型

我们最终也还是选择了异步执行,同步阻塞获取结果的方式来实现的发送消息。对主业务逻辑影响较小,而且也提供了完全异步的方式。异步我们一开始想过使用 AQS来进行实现,多个生产消息的线程进行对AQS填充,可以设置多个线程进行消费消息。而我在了解到Disruptor以及读了log4j异步写消息的部分源码之后最终决定使用 Disruptor 来实现对消息的分发。

消息服务实现

在说具体的实现之前,我们必须简单了解一下Disruptor主要的组件是什么。

Disruptor 组件

这里有很多知识点,比如 CAS、缓存行(伪共享)、内存屏障、队列、锁等。有兴趣可以自行搜索下,这里不过多介绍。
用户指南

  • Ring Buffer:Ring Buffer 通常被认为是 Disruptor 的主要方面。但是,从 3.0 开始,RingBuffer 只负责存储和更新Event通过 Disruptor 的数据。对于一些高级用例,它甚至可以完全由用户替换。

  • Sequence:Disruptor 使用Sequences 作为识别特定组件在哪里的一种手段。每个消费者(事件处理器)Sequence都像Disruptor本身一样维护一个。大多数并发代码依赖于这些序列值的移动,因此Sequence支持许多当前的特性AtomicLong。事实上,两者之间唯一真正的区别是Sequence包含额外的功能来防止Sequences和其他值之间的错误共享。

  • Sequencer:Sequencer 是 Disruptor 的真正核心。该接口的两种实现(单生产者、多生产者)实现了所有并发算法,以在生产者和消费者之间快速、正确地传递数据。

  • Sequence Barrier:Sequencer 生成一个 Sequence Barrier,其中包含对Sequence从Sequencer 发布的 main 和Sequence任何依赖消费者的 s 的引用。它包含确定是否有任何事件可供消费者处理的逻辑。

  • Wait Strategy:等待策略决定了消费者将如何等待生产者将事件放入 Disruptor。有关可选无锁的部分提供了更多详细信息。

  • Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。

  • Event Processor:用于处理来自 Disruptor 的事件的主事件循环,并拥有消费者序列的所有权。有一个称为BatchEventProcessor 的表示,它包含事件循环的有效实现,并将回调到 EventHandler 接口的使用提供的实现。

  • Event Handler:由用户实现的接口,代表 Disruptor 的消费者。

  • Producer:这是调用 Disruptor 入队Event的用户代码。

log4j2 的使用

Log4j2基于Disruptor异步日志优化(部分源码学习)
这里查看的源码版本是:log4j-core-2.13.3;下面重点关注异步方式的两个类

  1. 日志接收发送
    AsyncLogger 发送日志
public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBufferLogEvent> {
	
	//发送日志
	private final TranslatorType varargTranslatorType = new TranslatorType() {
        @Override
        void log(String fqcn, StackTraceElement location, Level level, Marker marker, Message message,
            Throwable thrown) {
            logWithVarargTranslator(fqcn, location, level, marker, message, thrown);
        }

        @Override
        void log(String fqcn, Level level, Marker marker, Message message, Throwable thrown) {
           
            //发送日志
            logWithVarargTranslator(fqcn, level, marker, message, thrown);
        }
    };

	private void logWithVarargTranslator(final String fqcn, final Level level, final Marker marker,
            final Message message, final Throwable thrown) {
         //获取  disruptor
        final Disruptor<RingBufferLogEvent> disruptor = loggerDisruptor.getDisruptor();
        if (disruptor == null) {
            LOGGER.error("Ignoring log event after Log4j has been shut down.");
            return;
        }
        // if the Message instance is reused, there is no point in freezing its message here
        if (!isReused(message)) {
            InternalAsyncUtil.makeMessageImmutable(message);
        }
        StackTraceElement location = null;
        //不阻塞,尝试往  ringbuffer中 添加任务
        if (!disruptor.getRingBuffer().tryPublishEvent(this,
                this, // asyncLogger: 0
                (location = calcLocationIfRequested(fqcn)), // location: 1
                fqcn, // 2
                level, // 3
                marker, // 4
                message, // 5
                thrown)) { // 6
                // 满了之后,执行这个方法
            handleRingBufferFull(location, fqcn, level, marker, message, thrown);
        }
    }
    
	private void handleRingBufferFull(final StackTraceElement location,
                                      final String fqcn,
                                      final Level level,
                                      final Marker marker,
                                      final Message msg,
                                      final Throwable thrown) {
        if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
            //如果递归 则用当前线程执行
            AsyncQueueFullMessageUtil.logWarningToStatusLogger();
            logMessageInCurrentThread(fqcn, level, marker, msg, thrown);
            return;
        }
        final EventRoute eventRoute = loggerDisruptor.getEventRoute(level);
        switch (eventRoute) {
            case ENQUEUE:
                // 队列 查看 【AsyncLoggerDisruptor】 
                loggerDisruptor.enqueueLogMessageWhenQueueFull(this,
                        this, // asyncLogger: 0
                        location, // location: 1
                        fqcn, // 2
                        level, // 3
                        marker, // 4
                        msg, // 5
                        thrown); // 6
                break;
            case SYNCHRONOUS:
                //  同步:用当前线程执行
                logMessageInCurrentThread(fqcn, level, marker, msg, thrown);
                break;
            case DISCARD:
                break;
            default:
                throw new IllegalStateException("Unknown EventRoute " + eventRoute);
        }
    }
	
	
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

AsyncLoggerDisruptor 日志事件容器

	public synchronized void start() {
	        if (disruptor != null) {
	            return;
	        }
	        //获取 长度,4KB  或者 256KB
	        ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
	        //等待策略
	        final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
	       
	        final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) {
	            @Override
	            public Thread newThread(final Runnable r) {
	                final Thread result = super.newThread(r);
	                backgroundThreadId = result.getId();
	                return result;
	            }
	        };
	        //队列满了之后的 策略 : 同步 队列  丢弃
	        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
	
	        disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI,
	                waitStrategy);
	
	        final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
	        disruptor.setDefaultExceptionHandler(errorHandler);
			//*** 处理任务线程,只用了一个线程 来进行消费日志事件。
	        final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
	        disruptor.handleEventsWith(handlers);
	
	        disruptor.start();
	
	        LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
	                : "vararg");
	        super.start();
	 }
	 
	
	void enqueueLogMessageWhenQueueFull(
            final EventTranslatorVararg<RingBufferLogEvent> translator,
            final AsyncLogger asyncLogger,
            final StackTraceElement location,
            final String fqcn,
            final Level level,
            final Marker marker,
            final Message msg,
            final Throwable thrown) {
        try {
             
            if (synchronizeEnqueueWhenQueueFull()) {
                //默认策略: 同步发送任务
                synchronized (queueFullEnqueueLock) {
                    disruptor.getRingBuffer().publishEvent(translator,
                            asyncLogger, // asyncLogger: 0
                            location, // location: 1
                            fqcn, // 2
                            level, // 3
                            marker, // 4
                            msg, // 5
                            thrown); // 6
                }
            } else {
                disruptor.getRingBuffer().publishEvent(translator,
                        asyncLogger, // asyncLogger: 0
                        location, // location: 1
                        fqcn, // 2
                        level, // 3
                        marker, // 4
                        msg, // 5
                        thrown); // 6
            }
        } catch (final NullPointerException npe) {
            // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
            logWarningOnNpeFromDisruptorPublish(level, fqcn, msg, thrown);
        }
    }
		
   // 是否需要 获取锁 同步发送
    private boolean synchronizeEnqueueWhenQueueFull() {
        return DisruptorUtil.ASYNC_LOGGER_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
                // 是否是 将消息消费的后台线程
                && backgroundThreadId != Thread.currentThread().getId();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

RingBufferLogEventHandler 日志处理器

public class RingBufferLogEventHandler implements
        SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {

    private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
    private Sequence sequenceCallback;
    private int counter;
    private long threadId = -1;

    @Override
    public void setSequenceCallback(final Sequence sequenceCallback) {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(final RingBufferLogEvent event, final long sequence,
            final boolean endOfBatch) throws Exception {
        try {
           // 这个才是 正在执行 日志的方法,会交给 apperder 来执行。
           //请查看 【AsyncAppender】
            event.execute(endOfBatch);
        }
        finally {
           //帮助 GC,有可能 ringbuffer 来不及清空。
            event.clear();
            notifyCallback(sequence);
        }
    }

    private void notifyCallback(long sequence) {
        if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
            sequenceCallback.set(sequence);
            counter = 0;
        }
    }

    /**
     * Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
     * yet.
     * @return the thread ID of the background consumer thread, or {@code -1}
     */
    public long getThreadId() {
        return threadId;
    }

    @Override
    public void onStart() {
        threadId = Thread.currentThread().getId();
    }

    @Override
    public void onShutdown() {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  1. 日志消费
    AsyncAppender 异步消费,使用了 DisruptorBlockingQueue

	private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
            final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
            final long shutdownTimeout, final Configuration config, final boolean includeLocation,
            final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) {
        super(name, filter, null, ignoreExceptions, properties);
        //事件队列
        this.queue = blockingQueueFactory.create(queueSize);
        this.queueSize = queueSize;
        this.blocking = blocking;
        this.shutdownTimeout = shutdownTimeout;
        this.config = config;
        this.appenderRefs = appenderRefs;
        this.errorRef = errorRef;
        this.includeLocation = includeLocation;
    }


	@Override
    public void start() {
        final Map<String, Appender> map = config.getAppenders();
        final List<AppenderControl> appenders = new ArrayList<>();
        for (final AppenderRef appenderRef : appenderRefs) {
            final Appender appender = map.get(appenderRef.getRef());
            if (appender != null) {
                appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
            } else {}
        }
        
        if (appenders.size() > 0) {
            //消费  队列中的  任务。
            thread = new AsyncThread(appenders, queue);
            thread.setName("AsyncAppender-" + getName());
        } 
        asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
        thread.start();
        super.start();
    }

   //发送消息 会将消息 保存到 队列中
	@Override
    public void append(final LogEvent logEvent) {
        if (!isStarted()) {
            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
        }
        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
        //尝试添加到队列中
        if (!transfer(memento)) {
            //队列满了 并且又 阻塞策略
            if (blocking) {
                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
                 
                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
                    logMessageInCurrentThread(logEvent);
                } else {
                   
                    final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
                    //执行具体策略 : 队列  同步  舍弃
                    route.logMessage(this, memento);
                }
            } else {
                logToErrorAppenderIfNecessary(false, memento);
            }
        }
    }

    private boolean transfer(final LogEvent memento) {
        return queue instanceof TransferQueue
            ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
            : queue.offer(memento);
    }
	
   // 异步线程,消费消息
	private class AsyncThread extends Log4jThread {

        private volatile boolean shutdown = false;
        private final List<AppenderControl> appenders;
        private final BlockingQueue<LogEvent> queue;

        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
            this.appenders = appenders;
            this.queue = queue;
            setDaemon(true);
        }

        @Override
        public void run() {
            while (!shutdown) {
                LogEvent event;
                try {
                   //获取消息
                    event = queue.take();
                    if (event == SHUTDOWN_LOG_EVENT) {
                        shutdown = true;
                        continue;
                    }
                } catch (final InterruptedException ex) {
                    break; // LOG4J2-830
                }
                event.setEndOfBatch(queue.isEmpty());
                //具体的输出方法,输出到到 文件 还是 控制台 看xml 配置
                final boolean success = callAppenders(event);
                if (!success && errorAppender != null) {
                    try {
                        errorAppender.callAppender(event);
                    } catch (final Exception ex) {
                        // Silently accept the error.
                    }
                }
            }
           
        }
	

	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117

设计图

消息服务设计图

接口定义

我们接口也仿造 log4j2 来创建消息分发消费的 框架。
这里我们直接看代码。还有demo演示。


实现

这里我们直接看代码。还有demo演示。


客户端发送消息示例

//客户端如何发送消息
//接收人
MessageReceiver messageReceiver = new MessageReceiver.Builder()
    .commonIds(new HashSet<>(Arrays.asList("user1","user2")))
    .userIds(new HashSet<>(Arrays.asList("user3","user4")))
	.emails(new HashSet<>(ImmutableList.of("XXX@ZZ.com")))
	.phones(new HashSet<>(ImmutableList.of("15802222222269"))))
	.build();

//不使用模板,自定义内容
SendContent sendContent = new SendContent.Builder().sms("这是一条自定义短信内容").build();

//变量替换
//您的备件归还单@&b variable1@(SR单号@&b variable2@)审核驳回,含@&b variable3@,驳回原因:@&b variable4@。
List<String> variables = Arrays.asList("归还单号", "申请单号","物料编码", "驳回原因");
				ContentVariable contentVariable = new ContentVariable.Builder()
						.sendVarSMS(variables)
						.sendVarEmail(variables)
						.build();
MessageContext messageContext = new 
MessageContext("BJ.BH","发送人",messageReceiver,sendContent ,contentVariable);
//发送---> 不关注结果
service.sendMessageAsync(messageContext);
//关注结果,提供获取结果时才阻塞,这一步不会阻塞
SendMessageResponse.AsyncResult asyncResult = service.sendMessage(messageContext);
//这里才阻塞
SendMessageResponse result = asyncResult.getResult();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

总结

本文主要介绍 Disruptor 在该需求下的使用,以及简单介绍了Log4j2对Disruptor的使用。


springcloud 脚手架(wx-cloud)

  • spring boot 2.3.3.RELEASE
  • nacos
  • geteway
  • oauth2
  • cache(spring+caffeine)
  • mybaits

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/254972
推荐阅读
相关标签
  

闽ICP备14008679号