赞
踩
依旧以muduo
为例。
使用timerfd
可以使用与socketfd
类型相同的方式在IO复用中使用。
使用timerfd_create()
创建一个timerfd
,接着使用timerfd_settime()
设置定时器的到期时间。
我们只需要注册timerfd
的可读事件,当定时器超时时,timerfd
变成可读,调用其设置的可读的回调函数。
来看下TimerQueue.h
class TimerQueue : noncopyable { public: explicit TimerQueue(EventLoop* loop); ~TimerQueue(); /// /// Schedules the callback to be run at given time, /// repeats if @c interval > 0.0. /// /// Must be thread safe. Usually be called from other threads. TimerId addTimer(TimerCallback cb, Timestamp when, double interval); void cancel(TimerId timerId); private: // FIXME: use unique_ptr<Timer> instead of raw pointers. // This requires heterogeneous comparison lookup (N3465) from C++14 // so that we can find an T* in a set<unique_ptr<T>>. typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; typedef std::pair<Timer*, int64_t> ActiveTimer; typedef std::set<ActiveTimer> ActiveTimerSet; void addTimerInLoop(Timer* timer); void cancelInLoop(TimerId timerId); // called when timerfd alarms void handleRead(); // move out all expired timers std::vector<Entry> getExpired(Timestamp now); void reset(const std::vector<Entry>& expired, Timestamp now); bool insert(Timer* timer); EventLoop* loop_; const int timerfd_; Channel timerfdChannel_; // Timer list sorted by expiration TimerList timers_; // for cancel() ActiveTimerSet activeTimers_; bool callingExpiredTimers_; /* atomic */ ActiveTimerSet cancelingTimers_; };
Timer
类表示一个具体的定时任务,TimerId
类标识某个定时任务,主要用来删除定时任务。
使用timers_
作为任务队列来组织定时任务,TimerList
即为std::set<Entry>
, Entry
即为std::pair<Timestamp, Timer*>
。这样设计的目的是为了使用std::set
,同时又可以保存超时事件相同的定时任务。其实可以直接使用std::multimap<Timestamp, Timer*>
,感觉这样会更方便。
成员timerfd_
由timer_create
创建,成员timerfdChannel_
用来监听timerfd_
的可读事件,它的回调函数为TimerQueue::read()
。
TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
std::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading();
}
使用接口TimerQueue::TimerId addTimer(TimerCallback cb, Timestamp when, double interval)
,当然EventLoop
有更为明显的接口EventLoop::RunAt()
、EventLoop::RunAfter()
、EventLoop::RunEvery()
,它们的实现都是调用TimerQueue::TimerId addTimer()`。
添加的任务的超时时间如果比之前最小的超时时间还小,需要重新设置定时器的超时时间,调用resetTimerfd()
即可。
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
memZero(&newValue, sizeof newValue);
memZero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
重新设置超时时间,不需要先取消原来的,再设置新的超时时间。直接调用timerfd_settime()
即可。
这里有一个小的知识点需要注意,如果需要取消原来设置的定时任务,调用timerfd_settime()
时,需要将传入的newValue都设置为0,
相当于调用memZero(&newValue, sizeof newValue)
,然后再调用timerfd_settime()
即可。
当定时器超时时,timerfd_
变成可读,void TimerQueue::handleRead()
将会被调用,来看下其实现:
void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now); std::vector<Entry> expired = getExpired(now); callingExpiredTimers_ = true; cancelingTimers_.clear(); // safe to callback outside critical section for (const Entry& it : expired) { it.second->run(); } callingExpiredTimers_ = false; reset(expired, now); }
首先需要调用void readTimerfd(int timerfd, Timestamp now)
读取timerfd_
中的数据(必须取到64位的数据),否则之后将因为依旧可读再次调用readTimerfd(level triggered)
。
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}
调用getExpired()
获取超时的定时任务,返回时就已经将这些超时任务从任务队列[timers_
]中删除,如果有任务时循环任务,将会在
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
重新添加到任务队列中。
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) { assert(timers_.size() == activeTimers_.size()); std::vector<Entry> expired; Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); TimerList::iterator end = timers_.lower_bound(sentry); assert(end == timers_.end() || now < end->first); std::copy(timers_.begin(), end, back_inserter(expired)); timers_.erase(timers_.begin(), end); // 删除超时的任务 for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); size_t n = activeTimers_.erase(timer); assert(n == 1); (void)n; } assert(timers_.size() == activeTimers_.size()); return expired; }
最后一步调用TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
,对于这些已经超时的任务,如果任务为循环任务,需要重新将其加入到任务队列中。处理完之后,如果任务队列中仍有任务,取得任务中最近的超时时间作为定时器的超时时间重启定时器。
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) { Timestamp nextExpire; for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); if (it.second->repeat() && cancelingTimers_.find(timer) == cancelingTimers_.end()) { it.second->restart(now); insert(it.second); } else { // FIXME move to a free list delete it.second; // FIXME: no delete please } } if (!timers_.empty()) { nextExpire = timers_.begin()->second->expiration(); } if (nextExpire.valid()) { resetTimerfd(timerfd_, nextExpire); } }
使用接口void TimerQueue::cancel(TimerId timerId)
,它会转调void TimerQueue::cancelInLoop(TimerId timerId)
,其实现如下:
void TimerQueue::cancelInLoop(TimerId timerId) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); ActiveTimer timer(timerId.timer_, timerId.sequence_); ActiveTimerSet::iterator it = activeTimers_.find(timer); if (it != activeTimers_.end()) { size_t n = timers_.erase(Entry(it->first->expiration(), it->first)); assert(n == 1); (void)n; delete it->first; // FIXME: no delete please activeTimers_.erase(it); } else if (callingExpiredTimers_) { cancelingTimers_.insert(timer); } assert(timers_.size() == activeTimers_.size()); }
其实就是直接将该定时任务删除。
以前有个疑问,为什么删除时不处理待删除任务就是超时时间最短的定时任务这种特殊情况。
其实是不需要的,因为当以带产出任务的超时时间超时时,会去取已经超时的任务。而该任务已经从任务队列中删除,所以将不会取到该超时任务,前面已经说过,取消任务会调用timerfd_settime()
,多了一个系统调用;不取消也没有什么影响。所以就采取了<不处理方式>。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。