From 244724759112389da6e887310ffdd4036f8dbafc Mon Sep 17 00:00:00 2001 From: zgzhanghao <474711079@qq.com> Date: Sat, 23 Mar 2019 10:45:38 +0800 Subject: [PATCH] add timer --- src/redis/timer.cc | 326 +++++++++++++++++++++++++++++++++++++++++++++ src/redis/timer.h | 154 +++++++++++++++++++++ 2 files changed, 480 insertions(+) create mode 100644 src/redis/timer.cc create mode 100644 src/redis/timer.h diff --git a/src/redis/timer.cc b/src/redis/timer.cc new file mode 100644 index 00000000..23fe98dc --- /dev/null +++ b/src/redis/timer.cc @@ -0,0 +1,326 @@ +#include "timer.h" +#include "eventloop.h" + +std::atomic Timer::numCreated = 0; + +Timer::Timer(TimerCallback &&cb, + TimeStamp &&expiration, bool repeat, double interval) + : repeat(repeat), + interval(interval), + expiration(std::move(expiration)), + callback(std::move(cb)), + sequence(++numCreated) { + +} + +Timer::~Timer() { + +} + +int64_t Timer::getSequence() { + return sequence; +} + +TimeStamp &Timer::getExpiration() { + return expiration; +} + +int64_t Timer::getWhen() { + return expiration.getMicroSecondsSinceEpoch(); +} + +bool Timer::getRepeat() { + return repeat; +} + +void Timer::setSequence(int64_t seq) { + sequence = seq; +} + +double Timer::getInterval() { + return interval; +} + +void Timer::run() { + assert(callback != nullptr); + callback(); +} + +void Timer::restart(const TimeStamp &now) { + if (repeat) { + expiration = std::move(addTime(now, interval)); + } else { + expiration = std::move(TimeStamp::invalid()); + } +} + +std::string TimeStamp::toString() const { + char buf[32] = {0}; + int64_t seconds = microSecondsSinceEpoch / kMicroSecondsPerSecond; + int64_t microseconds = microSecondsSinceEpoch % kMicroSecondsPerSecond; + snprintf(buf, sizeof(buf) - 1, "%" + PRId64 + ".%06" + PRId64 + "", seconds, microseconds); + return buf; +} + +std::string TimeStamp::toFormattedString(bool showMicroseconds) const { + char buf[32] = {0}; + time_t seconds = static_cast(microSecondsSinceEpoch / kMicroSecondsPerSecond); + struct tm tm; + time_t now = time(0); + tm = *(localtime(&now)); + if (showMicroseconds) { + int microseconds = static_cast(microSecondsSinceEpoch % kMicroSecondsPerSecond); + snprintf(buf, sizeof(buf), "%4d%02d%02d %02d:%02d:%02d.%06d", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec, + microseconds); + } else { + snprintf(buf, sizeof(buf), "%4d%02d%02d %02d:%02d:%02d", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); + } + return buf; +} + +#ifdef __linux__ +int64_t createTimerfd() +{ + int64_t timerfd = ::timerfd_create(CLOCK_MONOTONIC, + TFD_NONBLOCK | TFD_CLOEXEC); + if (timerfd < 0) + { + assert(false); + } + + return timerfd; +} +#endif + +int64_t howMuchTimeFrom(const TimeStamp &when) { + int64_t microseconds = when.getMicroSecondsSinceEpoch() + - TimeStamp::now().getMicroSecondsSinceEpoch(); + if (microseconds < 1000) { + microseconds = 1000; + } + return static_cast(microseconds / 1000); +} + + +int64_t TimerQueue::getTimeout() const { + loop->assertInLoopThread(); + if (timers.empty()) { + return 1000; + } else { + return howMuchTimeFrom(timers.begin()->second->getExpiration()); + } +} + +#ifdef __linux__ +struct timespec howMuchTimeFromNow(const TimeStamp &when) +{ + int64_t microseconds = when.getMicroSecondsSinceEpoch() + - TimeStamp::now().getMicroSecondsSinceEpoch(); + if (microseconds < 100) + { + microseconds = 100; + } + + struct timespec ts; + ts.tv_sec = static_cast(microseconds / TimeStamp::kMicroSecondsPerSecond); + ts.tv_nsec = static_cast((microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000); + return ts; +} + +void resetTimerfd(int64_t timerfd, const TimeStamp &expiration) +{ + struct itimerspec newValue; + struct itimerspec oldValue; + bzero(&newValue, sizeof newValue); + bzero(&oldValue, sizeof oldValue); + newValue.it_value = howMuchTimeFromNow(expiration); + int64_t net = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); + if (net < 0) + { + assert(false); + } +} + +void readTimerfd(int64_t timerfd, const TimeStamp &now) +{ + uint64_t howmany; + ssize_t n = ::read(timerfd, &howmany, sizeof howmany); + if (n != sizeof howmany) + { + assert(false); + } +} +#endif + +TimerQueue::TimerQueue(EventLoop *loop) + : loop(loop), +#ifdef __linux__ +timerfd(createTimerfd()), +timerfdChannel(loop, timerfd), +#endif + callingExpiredTimers(false) { +#ifdef __linux__ + timerfdChannel.setReadCallback(std::bind(&TimerQueue::handleRead, this)); + timerfdChannel.enableReading(); +#endif +} + +TimerQueue::~TimerQueue() { +#ifdef __linux__ + timerfdChannel.disableAll(); + timerfdChannel.remove(); + ::close(timerfd); +#endif +} + +TimerPtr TimerQueue::addTimer(double when, bool repeat, TimerCallback &&cb) { + TimeStamp time(addTime(TimeStamp::now(), when)); + TimerPtr timer(new Timer(std::move(cb), std::move(time), repeat, when)); + loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); + return timer; +} + +TimerPtr TimerQueue::addTimer(TimeStamp &&stamp, double when, bool repeat, TimerCallback &&cb) { + TimerPtr timer(new Timer(std::move(cb), std::move(stamp), repeat, when)); + loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer)); + return timer; +} + +void TimerQueue::cancelTimer(const TimerPtr &timer) { + loop->runInLoop(std::bind(&TimerQueue::cancelInloop, this, timer)); +} + +void TimerQueue::cancelInloop(const TimerPtr &timer) { + loop->assertInLoopThread(); + assert(timers.size() == activeTimers.size()); + + auto it = activeTimers.find(timer->getSequence()); + if (it != activeTimers.end()) { + auto iter = timers.find(timer->getWhen()); + while (iter != timers.end()) { + if (timer->getSequence() == iter->second->getSequence()) { + timers.erase(iter); + break; + } else { + ++iter; + } + } + activeTimers.erase(it); + } else if (callingExpiredTimers) { + cancelingTimers.insert(std::make_pair(timer->getSequence(), timer)); + } + assert(timers.size() == activeTimers.size()); +} + +void TimerQueue::addTimerInLoop(const TimerPtr &timer) { + loop->assertInLoopThread(); + bool earliestChanged = insert(timer); + if (earliestChanged) { +#ifdef __linux__ + resetTimerfd(timerfd, timer->getExpiration()); +#endif + } +} + +TimerPtr TimerQueue::getTimerBegin() { + if (timers.empty()) { + return nullptr; + } + return timers.begin()->second; +} + +void TimerQueue::handleRead() { + loop->assertInLoopThread(); + assert(timers.size() == activeTimers.size()); + TimeStamp now(TimeStamp::now()); + +#ifdef __linux__ + readTimerfd(timerfd, now); +#endif + getExpired(now); + + callingExpiredTimers = true; + cancelingTimers.clear(); + // safe to callback outside critical section + + for (auto &it : expired) { + it.second->run(); + } + + callingExpiredTimers = false; + reset(now); +} + +bool TimerQueue::insert(const TimerPtr &timer) { + loop->assertInLoopThread(); + assert(timers.size() == activeTimers.size()); + + bool earliestChanged = false; + int64_t microseconds = timer->getExpiration().getMicroSecondsSinceEpoch(); + auto it = timers.begin(); + if (it == timers.end() || microseconds < it->first) { + earliestChanged = true; + } + + timers.insert(std::make_pair(microseconds, timer)); + activeTimers.insert(std::make_pair(timer->getSequence(), timer)); + + if (timers.size() != activeTimers.size()) { + assert(false); + } + assert(timers.size() == activeTimers.size()); + return earliestChanged; +} + +void TimerQueue::reset(const TimeStamp &now) { + TimeStamp nextExpire; + for (auto &it : expired) { + if (it.second->getRepeat() && + cancelingTimers.find(it.second->getSequence()) == cancelingTimers.end()) { + it.second->restart(now); + insert(it.second); + } else { + + } + } + + expired.clear(); + if (!timers.empty()) { + nextExpire = timers.begin()->second->getExpiration(); + } + + if (nextExpire.valid()) { +#ifdef __linux__ + resetTimerfd(timerfd, nextExpire); +#endif + } +} + +size_t TimerQueue::getTimerSize() { + loop->assertInLoopThread(); + assert(timers.size() == activeTimers.size()); + return timers.size(); +} + +void TimerQueue::getExpired(const TimeStamp &now) { + assert(timers.size() == activeTimers.size()); + auto end = timers.lower_bound(now.getMicroSecondsSinceEpoch()); + assert(end == timers.end() || now.getMicroSecondsSinceEpoch() <= end->first); + expired.insert(timers.begin(), end); + timers.erase(timers.begin(), end); + + for (auto &it : expired) { + size_t n = activeTimers.erase(it.second->getSequence()); + assert(n == 1); + (void) n; + } + assert(timers.size() == activeTimers.size()); +} diff --git a/src/redis/timer.h b/src/redis/timer.h new file mode 100644 index 00000000..3507a772 --- /dev/null +++ b/src/redis/timer.h @@ -0,0 +1,154 @@ +#pragma once + +#include "all.h" +#include "channel.h" +#include "callback.h" + +class EventLoop; + +class TimeStamp { +public: + TimeStamp() + : microSecondsSinceEpoch(0) { + + } + + explicit TimeStamp(int64_t microSecondsSinceEpochArg) + : microSecondsSinceEpoch(microSecondsSinceEpochArg) { + + } + + int64_t getMicroSecondsSinceEpoch() const { + return microSecondsSinceEpoch; + } + + time_t secondsSinceEpoch() const { + return static_cast(microSecondsSinceEpoch / kMicroSecondsPerSecond); + } + + bool valid() const { return microSecondsSinceEpoch > 0; } + + std::string toFormattedString(bool showMicroseconds = true) const; + + static TimeStamp now() { + auto timeNow = std::chrono::system_clock::now(); + auto microseconds = std::chrono::duration_cast(timeNow.time_since_epoch()); + return TimeStamp(microseconds.count()); + } + + std::string toString() const; + + static TimeStamp invalid() { return TimeStamp(); } + + static const int32_t kMicroSecondsPerSecond = 1000 * 1000; + +private: + int64_t microSecondsSinceEpoch; +}; + +inline bool operator<(const TimeStamp &lhs, const TimeStamp &rhs) { + return lhs.getMicroSecondsSinceEpoch() < rhs.getMicroSecondsSinceEpoch(); +} + +inline bool operator==(const TimeStamp &lhs, const TimeStamp &rhs) { + return lhs.getMicroSecondsSinceEpoch() == rhs.getMicroSecondsSinceEpoch(); +} + +inline TimeStamp addTime(const TimeStamp ×tamp, double seconds) { + int64_t delta = static_cast(seconds * TimeStamp::kMicroSecondsPerSecond); + return TimeStamp(timestamp.getMicroSecondsSinceEpoch() + delta); +} + +inline double timeDifference(const TimeStamp &high, const TimeStamp &low) { + int64_t diff = high.getMicroSecondsSinceEpoch() - low.getMicroSecondsSinceEpoch(); + return static_cast(diff) / TimeStamp::kMicroSecondsPerSecond; +} + +class Timer { +public: + Timer(TimerCallback &&cb, TimeStamp &&expiration, + bool repeat, double interval); + + ~Timer(); + + void run(); + + int64_t getSequence(); + + int64_t getWhen(); + + TimeStamp &getExpiration(); + + bool getRepeat(); + + void setSequence(int64_t seq); + + void restart(const TimeStamp &now); + + double getInterval(); + +private: + Timer(const Timer &); + + void operator=(const Timer &); + + bool repeat; + double interval; + int64_t sequence; + TimeStamp expiration; + TimerCallback callback; + static std::atomic numCreated; +}; + +class TimerQueue { +public: + TimerQueue(EventLoop *loop); + + ~TimerQueue(); + + void cancelTimer(const TimerPtr &timer); + + void handleRead(); + + TimerPtr addTimer(double when, bool repeat, TimerCallback &&cb); + + TimerPtr addTimer(TimeStamp &&stamp, double when, bool repeat, TimerCallback &&cb); + + TimerPtr getTimerBegin(); + + int64_t getTimeout() const; + + size_t getTimerSize(); + +private: + TimerQueue(const TimerQueue &); + + void operator=(const TimerQueue &); + + EventLoop *loop; + int32_t timerfd; +#ifdef __linux__ + Channel timerfdChannel; +#endif + + void cancelInloop(const TimerPtr &timer); + + void addTimerInLoop(const TimerPtr &timer); + + void getExpired(const TimeStamp &now); + + void reset(const TimeStamp &now); + + bool insert(const TimerPtr &timer); + + typedef std::multimap TimerList; + typedef std::map ActiveTimer; + + ActiveTimer activeTimers; + ActiveTimer cancelingTimers; + TimerList expired; + TimerList timers; + bool callingExpiredTimers; +}; + +