赞
踩
在某些场景下(物料入库成功或者物料装箱成功)需要给特定的人发送消息(可以是短信、邮件或者系统内的消息)。
场景多个、消息类型多个
如在入库中可能存在【一段时间】内存在很多人操作,这里假设操作库房的人数有100人,他们可能1分钟之内都会操作入库操作。这意味着1分钟会有100次请求访问,100次消息发送请求。
如果场景增多(目前15个),这里认为最高峰,假设 1分钟内执行15个场景,每个场景 50人操作:这就有 大概 15*50=750条消息请求需要发送。
每个场景设计的业务逻辑很复杂,IO操作频繁。所以我们需要思考如何能最小程度的进行发送消息,而不去大量占用 业务逻辑时间呢。
这里有人就会问,那发消息直接使用异步不就行了吗?但是需求还要求能够同步获取到 发送消息的结果。如果使用异步,我们选择那种异步框架?对异步线程如何处理?这都是需要思考的地方。
我们最终也还是选择了异步执行,同步阻塞获取结果的方式来实现的发送消息。对主业务逻辑影响较小,而且也提供了完全异步的方式。异步我们一开始想过使用 AQS来进行实现,多个生产消息的线程进行对AQS填充,可以设置多个线程进行消费消息。而我在了解到Disruptor以及读了log4j异步写消息的部分源码之后最终决定使用 Disruptor 来实现对消息的分发。
在说具体的实现之前,我们必须简单了解一下Disruptor主要的组件是什么。
这里有很多知识点,比如 CAS、缓存行(伪共享)、内存屏障、队列、锁等。有兴趣可以自行搜索下,这里不过多介绍。
用户指南
Ring Buffer:Ring Buffer 通常被认为是 Disruptor 的主要方面。但是,从 3.0 开始,RingBuffer 只负责存储和更新Event通过 Disruptor 的数据。对于一些高级用例,它甚至可以完全由用户替换。
Sequence:Disruptor 使用Sequences 作为识别特定组件在哪里的一种手段。每个消费者(事件处理器)Sequence都像Disruptor本身一样维护一个。大多数并发代码依赖于这些序列值的移动,因此Sequence支持许多当前的特性AtomicLong。事实上,两者之间唯一真正的区别是Sequence包含额外的功能来防止Sequences和其他值之间的错误共享。
Sequencer:Sequencer 是 Disruptor 的真正核心。该接口的两种实现(单生产者、多生产者)实现了所有并发算法,以在生产者和消费者之间快速、正确地传递数据。
Sequence Barrier:Sequencer 生成一个 Sequence Barrier,其中包含对Sequence从Sequencer 发布的 main 和Sequence任何依赖消费者的 s 的引用。它包含确定是否有任何事件可供消费者处理的逻辑。
Wait Strategy:等待策略决定了消费者将如何等待生产者将事件放入 Disruptor。有关可选无锁的部分提供了更多详细信息。
Event:从生产者传递给消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。
Event Processor:用于处理来自 Disruptor 的事件的主事件循环,并拥有消费者序列的所有权。有一个称为BatchEventProcessor 的表示,它包含事件循环的有效实现,并将回调到 EventHandler 接口的使用提供的实现。
Event Handler:由用户实现的接口,代表 Disruptor 的消费者。
Producer:这是调用 Disruptor 入队Event的用户代码。
Log4j2基于Disruptor异步日志优化(部分源码学习)
这里查看的源码版本是:log4j-core-2.13.3;下面重点关注异步方式的两个类
public class AsyncLogger extends Logger implements EventTranslatorVararg<RingBufferLogEvent> { //发送日志 private final TranslatorType varargTranslatorType = new TranslatorType() { @Override void log(String fqcn, StackTraceElement location, Level level, Marker marker, Message message, Throwable thrown) { logWithVarargTranslator(fqcn, location, level, marker, message, thrown); } @Override void log(String fqcn, Level level, Marker marker, Message message, Throwable thrown) { //发送日志 logWithVarargTranslator(fqcn, level, marker, message, thrown); } }; private void logWithVarargTranslator(final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) { //获取 disruptor final Disruptor<RingBufferLogEvent> disruptor = loggerDisruptor.getDisruptor(); if (disruptor == null) { LOGGER.error("Ignoring log event after Log4j has been shut down."); return; } // if the Message instance is reused, there is no point in freezing its message here if (!isReused(message)) { InternalAsyncUtil.makeMessageImmutable(message); } StackTraceElement location = null; //不阻塞,尝试往 ringbuffer中 添加任务 if (!disruptor.getRingBuffer().tryPublishEvent(this, this, // asyncLogger: 0 (location = calcLocationIfRequested(fqcn)), // location: 1 fqcn, // 2 level, // 3 marker, // 4 message, // 5 thrown)) { // 6 // 满了之后,执行这个方法 handleRingBufferFull(location, fqcn, level, marker, message, thrown); } } private void handleRingBufferFull(final StackTraceElement location, final String fqcn, final Level level, final Marker marker, final Message msg, final Throwable thrown) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 //如果递归 则用当前线程执行 AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(fqcn, level, marker, msg, thrown); return; } final EventRoute eventRoute = loggerDisruptor.getEventRoute(level); switch (eventRoute) { case ENQUEUE: // 队列 查看 【AsyncLoggerDisruptor】 loggerDisruptor.enqueueLogMessageWhenQueueFull(this, this, // asyncLogger: 0 location, // location: 1 fqcn, // 2 level, // 3 marker, // 4 msg, // 5 thrown); // 6 break; case SYNCHRONOUS: // 同步:用当前线程执行 logMessageInCurrentThread(fqcn, level, marker, msg, thrown); break; case DISCARD: break; default: throw new IllegalStateException("Unknown EventRoute " + eventRoute); } } }
AsyncLoggerDisruptor 日志事件容器
public synchronized void start() { if (disruptor != null) { return; } //获取 长度,4KB 或者 256KB ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize"); //等待策略 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy"); final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) { @Override public Thread newThread(final Runnable r) { final Thread result = super.newThread(r); backgroundThreadId = result.getId(); return result; } }; //队列满了之后的 策略 : 同步 队列 丢弃 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy); final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler(); disruptor.setDefaultExceptionHandler(errorHandler); //*** 处理任务线程,只用了一个线程 来进行消费日志事件。 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()}; disruptor.handleEventsWith(handlers); disruptor.start(); LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal" : "vararg"); super.start(); } void enqueueLogMessageWhenQueueFull( final EventTranslatorVararg<RingBufferLogEvent> translator, final AsyncLogger asyncLogger, final StackTraceElement location, final String fqcn, final Level level, final Marker marker, final Message msg, final Throwable thrown) { try { if (synchronizeEnqueueWhenQueueFull()) { //默认策略: 同步发送任务 synchronized (queueFullEnqueueLock) { disruptor.getRingBuffer().publishEvent(translator, asyncLogger, // asyncLogger: 0 location, // location: 1 fqcn, // 2 level, // 3 marker, // 4 msg, // 5 thrown); // 6 } } else { disruptor.getRingBuffer().publishEvent(translator, asyncLogger, // asyncLogger: 0 location, // location: 1 fqcn, // 2 level, // 3 marker, // 4 msg, // 5 thrown); // 6 } } catch (final NullPointerException npe) { // LOG4J2-639: catch NPE if disruptor field was set to null in stop() logWarningOnNpeFromDisruptorPublish(level, fqcn, msg, thrown); } } // 是否需要 获取锁 同步发送 private boolean synchronizeEnqueueWhenQueueFull() { return DisruptorUtil.ASYNC_LOGGER_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL // 是否是 将消息消费的后台线程 && backgroundThreadId != Thread.currentThread().getId(); }
RingBufferLogEventHandler 日志处理器
public class RingBufferLogEventHandler implements SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware { private static final int NOTIFY_PROGRESS_THRESHOLD = 50; private Sequence sequenceCallback; private int counter; private long threadId = -1; @Override public void setSequenceCallback(final Sequence sequenceCallback) { this.sequenceCallback = sequenceCallback; } @Override public void onEvent(final RingBufferLogEvent event, final long sequence, final boolean endOfBatch) throws Exception { try { // 这个才是 正在执行 日志的方法,会交给 apperder 来执行。 //请查看 【AsyncAppender】 event.execute(endOfBatch); } finally { //帮助 GC,有可能 ringbuffer 来不及清空。 event.clear(); notifyCallback(sequence); } } private void notifyCallback(long sequence) { if (++counter > NOTIFY_PROGRESS_THRESHOLD) { sequenceCallback.set(sequence); counter = 0; } } /** * Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started * yet. * @return the thread ID of the background consumer thread, or {@code -1} */ public long getThreadId() { return threadId; } @Override public void onStart() { threadId = Thread.currentThread().getId(); } @Override public void onShutdown() { } }
private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config, final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { super(name, filter, null, ignoreExceptions, properties); //事件队列 this.queue = blockingQueueFactory.create(queueSize); this.queueSize = queueSize; this.blocking = blocking; this.shutdownTimeout = shutdownTimeout; this.config = config; this.appenderRefs = appenderRefs; this.errorRef = errorRef; this.includeLocation = includeLocation; } @Override public void start() { final Map<String, Appender> map = config.getAppenders(); final List<AppenderControl> appenders = new ArrayList<>(); for (final AppenderRef appenderRef : appenderRefs) { final Appender appender = map.get(appenderRef.getRef()); if (appender != null) { appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); } else {} } if (appenders.size() > 0) { //消费 队列中的 任务。 thread = new AsyncThread(appenders, queue); thread.setName("AsyncAppender-" + getName()); } asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); thread.start(); super.start(); } //发送消息 会将消息 保存到 队列中 @Override public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); //尝试添加到队列中 if (!transfer(memento)) { //队列满了 并且又 阻塞策略 if (blocking) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(logEvent); } else { final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); //执行具体策略 : 队列 同步 舍弃 route.logMessage(this, memento); } } else { logToErrorAppenderIfNecessary(false, memento); } } } private boolean transfer(final LogEvent memento) { return queue instanceof TransferQueue ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) : queue.offer(memento); } // 异步线程,消费消息 private class AsyncThread extends Log4jThread { private volatile boolean shutdown = false; private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue; public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); this.appenders = appenders; this.queue = queue; setDaemon(true); } @Override public void run() { while (!shutdown) { LogEvent event; try { //获取消息 event = queue.take(); if (event == SHUTDOWN_LOG_EVENT) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); //具体的输出方法,输出到到 文件 还是 控制台 看xml 配置 final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } } }
我们接口也仿造 log4j2 来创建消息分发消费的 框架。
这里我们直接看代码。还有demo演示。
这里我们直接看代码。还有demo演示。
//客户端如何发送消息 //接收人 MessageReceiver messageReceiver = new MessageReceiver.Builder() .commonIds(new HashSet<>(Arrays.asList("user1","user2"))) .userIds(new HashSet<>(Arrays.asList("user3","user4"))) .emails(new HashSet<>(ImmutableList.of("XXX@ZZ.com"))) .phones(new HashSet<>(ImmutableList.of("15802222222269")))) .build(); //不使用模板,自定义内容 SendContent sendContent = new SendContent.Builder().sms("这是一条自定义短信内容").build(); //变量替换 //您的备件归还单@&b variable1@(SR单号@&b variable2@)审核驳回,含@&b variable3@,驳回原因:@&b variable4@。 List<String> variables = Arrays.asList("归还单号", "申请单号","物料编码", "驳回原因"); ContentVariable contentVariable = new ContentVariable.Builder() .sendVarSMS(variables) .sendVarEmail(variables) .build(); MessageContext messageContext = new MessageContext("BJ.BH","发送人",messageReceiver,sendContent ,contentVariable); //发送---> 不关注结果 service.sendMessageAsync(messageContext); //关注结果,提供获取结果时才阻塞,这一步不会阻塞 SendMessageResponse.AsyncResult asyncResult = service.sendMessage(messageContext); //这里才阻塞 SendMessageResponse result = asyncResult.getResult();
本文主要介绍 Disruptor 在该需求下的使用,以及简单介绍了Log4j2对Disruptor的使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。