赞
踩
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第九篇,介绍SOFARegistry自动调节间隔周期性任务的实现。
蚂蚁金服这里的业务需求主要是:
阿里采用了:
我们在设计延时/周期性任务时就可以参考TimedSupervisorTask的实现
Scheduler类中就是这个方案的体现。
首先,我们需要看看 Scheduler的代码。
public class Scheduler { private final ScheduledExecutorService scheduler; public final ExecutorService versionCheckExecutor; private final ThreadPoolExecutor expireCheckExecutor; @Autowired private AcceptorStore localAcceptorStore; public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); } public void startScheduler() { scheduler.schedule( new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()), 30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); } public void stopScheduler() { if (scheduler != null && !scheduler.isShutdown()) { scheduler.shutdown(); } if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) { versionCheckExecutor.shutdown(); } } }
接下来我们就逐一分析下其实现或者说是设计选择。
阿里这里采用ExecutorService实现了无限循环任务,不定期完成业务。
Executor:一个JAVA接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,用来执行一个实现了Runnable接口的类。
ExecutorService:是一个比Executor使用更广泛的子类接口。
其提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;
当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
这里ExecutorService虽然其不能提供周期性功能,但是localAcceptorStore.changeDataCheck
本身就是一个while (true) loop,其可以依靠DelayQueue来完成类似周期功能。
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); public void changeDataCheck() { while (true) { try { DelayItem<Acceptor> delayItem = delayQueue.take(); Acceptor acceptor = delayItem.getItem(); removeCache(acceptor); // compare and remove } catch (InterruptedException e) { break; } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } }
阿里这里采用了 ScheduledExecutorService 实现了周期性任务。
ScheduledExecutorService是一种线程池,ScheduledExecutorService在ExecutorService提供的功能之上再增加了延迟和定期执行任务的功能。
其schedule方法创建具有各种延迟的任务,并返回可用于取消或检查执行的任务对象。
寻常的Timer的内部只有一个线程,如果有多个任务的话就会顺序执行,这样我们的延迟时间和循环时间就会出现问题,而且异常未检查会中止线程。
ScheduledExecutorService是线程池,并且线程池对异常做了处理,使得任务之间不会有影响。在对延迟任务和循环任务要求严格的时候,就需要考虑使用ScheduledExecutorService了。
ThreadPoolExecutor的完整构造方法的签名如下
ThreadPoolExecutor
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)12
其中,workQueue参数介绍如下:
workQueue任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
这里采用了两种Queue。
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
LinkedBlockingQueue是一种阻塞队列。
LinkedBlockingQueue内部由单链表实现了BlockingQueue接口,只能从head取元素,从tail添加元素。
LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说LinkedBlockingQueue是读写分离的,添加和删除操作并不是互斥操作,可以并行进行,这样也就可以大大提高吞吐量。
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为Integer.MAX_VALUE
,也就是无界队列。如果存在添加速度大于删除速度时候,有可能会内存溢出,所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间。
你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。
数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
TimedSupervisorTask 是一个自动调节间隔的周期性任务。这里基本是借鉴了Eureka的同名实现,但是SOFA这里去除了“部分异常处理逻辑”。
从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值,另外还有CAS来控制多线程同步。
主要逻辑如下:
Math.min(maxDelay, currentDelay x 2)
得到任务延时时间 x 2 和 最大延时时间的最小值,然后改变任务的延时时间timeoutMillis;scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS)
:执行完任务后,会再次调用schedule方法,在指定的时间之后执行一次相同的任务,这个间隔时间和最近一次任务是否超时有关,如果超时了就间隔时间就会变大;其实现如下:
public class TimedSupervisorTask extends TimerTask { private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private String name; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.name = name; this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; } @Override public void run() { Future future = null; try { //使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了 future = executor.submit(task); //指定等待子线程的最长时间 // block until done or timeout future.get(timeoutMillis, TimeUnit.MILLISECONDS); // 每次执行任务成功都会将delay重置 delay.set(timeoutMillis); } catch (TimeoutException e) { long currentDelay = delay.get(); // 如果出现异常,则将时间*2,然后取 定时时间 和 最长定时时间 中最小的为下次任务执行的延时时间 long newDelay = Math.min(maxDelay, currentDelay * 2); // 设置为最新的值,考虑到多线程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 线程池的阻塞队列中放满了待处理任务,触发了拒绝策略 LOGGER.error("{} task supervisor rejected the task: {}", name, task, e); } catch (Throwable e) { // 出现未知的异常 LOGGER.error("{} task supervisor threw an exception", name, e); } finally { //这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务; if (future != null) { future.cancel(true); } //这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时dealy的值, //假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound) //如果最近一次任务没有超时,那么就在30秒后开始新任务, //如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
Eureka系列(六) TimedSupervisorTask类解析
Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)
java线程池ThreadPoolExecutor类使用详解
Java线程池ThreadPoolExecutor实现原理剖析
深入理解Java线程池:ThreadPoolExecutor
Java中线程池ThreadPoolExecutor原理探究
ScheduledExecutorService 和 Timer 的区别
Java并发包中的同步队列SynchronousQueue实现原理
ThreadPoolExecutor线程池解析与BlockingQueue的三种实现
【细谈Java并发】谈谈LinkedBlockingQueue
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。