赞
踩
1. 简介
TimingWheel是kafka时间轮的实现,内部包含了⼀个TimerTaskList数组,每个数组包含了⼀些链表组成的TimerTaskEntry事件,每个TimerTaskList表示时间轮的某⼀格,这⼀格的时间跨度为tickMs,同⼀个TimerTaskList中的事件都是相差在⼀个tickMs跨度内的,整个时间轮的时间跨度为interval = tickMs * wheelSize,该时间轮能处理的时间范围在cuurentTime到currentTime + interval之间的事件。
当添加⼀个时间他的超时时间⼤于整个时间轮的跨度时, expiration >= currentTime + interval,则会将该事件向上级传递,上级的tickMs是下级的interval,传递直到某⼀个时间轮满⾜expiration < currentTime + interval,
然后计算对应位于哪⼀格,然后将事件放进去,重新设置超时时间,然后放进jdk延迟队列
else if (expiration < currentTime + interval) {
|
|
// Put in its own bucket |
|
val virtualId = expiration / tickMs |
|
val bucket = buckets((virtualId % wheelSize.toLong).toInt) |
|
bucket.add(timerTaskEntry) |
|
// Set the bucket expiration time |
|
if (bucket.setExpiration(virtualId * tickMs)) {
|
|
// The bucket needs to be enqueued because it was an expired bucket |
|
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced |
|
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle |
|
// will pass in the same value and hence return false, thus the bucket with the same expiration will not |
|
// be enqueued multiple times. |
|
queue.offer(bucket) |
|
} |
SystemTimer会取出queue中的TimerTaskList,根据expiration将currentTime往前推进,然后把⾥⾯所有的事件重新放进时间轮中,因为ct推进了,所以有些事件会在第0格,表示到期了,直接返回。
else if (expiration < currentTime + tickMs) {
然后将任务提交到java线程池中处理。
服务端在处理客户端的请求,针对不同的请求,可能不会⽴即返回响应结果给客户端。在处理这类请求时,服务端会为这类请求创建延迟操作对象放⼊延迟缓存队列中。延迟缓存的数据结构类似MAP,延迟操作对象从延迟缓存队列中完成并移除有两种⽅式:
2. 延时操作接口
DelayedOperation
接⼝表示延迟的操作对象。此接⼝的实现类包括延迟加⼊,延迟⼼跳,延迟⽣产,延迟拉取。延迟接⼝相关的⽅法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。