Skip to content

Commit 5769688

Browse files
committed
support cancelling timers.
1 parent 1c14e45 commit 5769688

7 files changed

+101
-15
lines changed

ChangeLog

+7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
2011-09-18 Shuo Chen <[email protected]>
2+
3+
* EventLoop now supports cancelling timer.
4+
* Add two examples of asio chat server, demo copy-on-write
5+
in mulrithreaded program.
6+
* Version 0.2.9
7+
18
2011-09-04 Shuo Chen <[email protected]>
29

310
* Refactored RPC implementation of version 1 and 2,

muduo/net/EventLoop.cc

+5
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
172172
return timerQueue_->addTimer(cb, time, interval);
173173
}
174174

175+
void EventLoop::cancel(TimerId timerId)
176+
{
177+
return timerQueue_->cancel(timerId);
178+
}
179+
175180
void EventLoop::updateChannel(Channel* channel)
176181
{
177182
assert(channel->ownerLoop() == this);

muduo/net/EventLoop.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class EventLoop : boost::noncopyable
8989
/// Cancels the timer.
9090
/// Safe to call from other threads.
9191
///
92-
// void cancel(TimerId timerId);
92+
void cancel(TimerId timerId);
9393

9494
// internal usage
9595
void wakeup();

muduo/net/TimerId.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ class TimerId : public muduo::copyable
2727
{
2828
public:
2929
TimerId(Timer* timer, int64_t seq)
30-
: value_(timer),
30+
: timer_(timer),
3131
seq_(seq)
3232
{
3333
}
3434

3535
// default copy-ctor, dtor and assignment are okay
3636

37+
friend class TimerQueue;
38+
3739
private:
38-
Timer* value_;
40+
Timer* timer_;
3941
int64_t seq_;
4042
};
4143

muduo/net/TimerQueue.cc

+64-10
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ TimerQueue::TimerQueue(EventLoop* loop)
9090
: loop_(loop),
9191
timerfd_(createTimerfd()),
9292
timerfdChannel_(loop, timerfd_),
93-
timers_()
93+
timers_(),
94+
callingExpiredTimers_(false)
9495
{
9596
timerfdChannel_.setReadCallback(
9697
boost::bind(&TimerQueue::handleRead, this));
@@ -119,6 +120,12 @@ TimerId TimerQueue::addTimer(const TimerCallback& cb,
119120
return TimerId(timer, timer->sequence());
120121
}
121122

123+
void TimerQueue::cancel(TimerId timerId)
124+
{
125+
loop_->runInLoop(
126+
boost::bind(&TimerQueue::cancelInLoop, this, timerId));
127+
}
128+
122129
void TimerQueue::scheduleInLoop(Timer* timer)
123130
{
124131
loop_->assertInLoopThread();
@@ -130,6 +137,26 @@ void TimerQueue::scheduleInLoop(Timer* timer)
130137
}
131138
}
132139

140+
void TimerQueue::cancelInLoop(TimerId timerId)
141+
{
142+
loop_->assertInLoopThread();
143+
assert(timers_.size() == activeTimers_.size());
144+
ActiveTimer timer(timerId.timer_, timerId.seq_);
145+
ActiveTimerSet::iterator it = activeTimers_.find(timer);
146+
if (it != activeTimers_.end())
147+
{
148+
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
149+
assert(n == 1); (void)n;
150+
delete it->first; // FIXME: no delete please
151+
activeTimers_.erase(it);
152+
}
153+
else if (callingExpiredTimers_)
154+
{
155+
cancelingTimers_.insert(timer);
156+
}
157+
assert(timers_.size() == activeTimers_.size());
158+
}
159+
133160
void TimerQueue::handleRead()
134161
{
135162
loop_->assertInLoopThread();
@@ -138,25 +165,38 @@ void TimerQueue::handleRead()
138165

139166
std::vector<Entry> expired = getExpired(now);
140167

168+
callingExpiredTimers_ = true;
169+
cancelingTimers_.clear();
141170
// safe to callback outside critical section
142171
for (std::vector<Entry>::iterator it = expired.begin();
143172
it != expired.end(); ++it)
144173
{
145174
it->second->run();
146175
}
176+
callingExpiredTimers_ = false;
147177

148178
reset(expired, now);
149179
}
150180

151181
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
152182
{
183+
assert(timers_.size() == activeTimers_.size());
153184
std::vector<Entry> expired;
154-
Entry sentry = std::make_pair(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
155-
TimerList::iterator it = timers_.lower_bound(sentry);
156-
assert(it == timers_.end() || now < it->first);
157-
std::copy(timers_.begin(), it, back_inserter(expired));
158-
timers_.erase(timers_.begin(), it);
185+
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
186+
TimerList::iterator end = timers_.lower_bound(sentry);
187+
assert(end == timers_.end() || now < end->first);
188+
std::copy(timers_.begin(), end, back_inserter(expired));
189+
timers_.erase(timers_.begin(), end);
159190

191+
for (std::vector<Entry>::iterator it = expired.begin();
192+
it != expired.end(); ++it)
193+
{
194+
ActiveTimer timer(it->second, it->second->sequence());
195+
size_t n = activeTimers_.erase(timer);
196+
assert(n == 1); (void)n;
197+
}
198+
199+
assert(timers_.size() == activeTimers_.size());
160200
return expired;
161201
}
162202

@@ -167,15 +207,17 @@ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
167207
for (std::vector<Entry>::const_iterator it = expired.begin();
168208
it != expired.end(); ++it)
169209
{
170-
if (it->second->repeat())
210+
ActiveTimer timer(it->second, it->second->sequence());
211+
if (it->second->repeat()
212+
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
171213
{
172214
it->second->restart(now);
173215
insert(it->second);
174216
}
175217
else
176218
{
177219
// FIXME move to a free list
178-
delete it->second;
220+
delete it->second; // FIXME: no delete please
179221
}
180222
}
181223

@@ -192,15 +234,27 @@ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
192234

193235
bool TimerQueue::insert(Timer* timer)
194236
{
237+
loop_->assertInLoopThread();
238+
assert(timers_.size() == activeTimers_.size());
195239
bool earliestChanged = false;
196240
Timestamp when = timer->expiration();
197241
TimerList::iterator it = timers_.begin();
198242
if (it == timers_.end() || when < it->first)
199243
{
200244
earliestChanged = true;
201245
}
202-
std::pair<TimerList::iterator, bool> result = timers_.insert(std::make_pair(when, timer));
203-
assert(result.second); (void)result;
246+
{
247+
std::pair<TimerList::iterator, bool> result
248+
= timers_.insert(Entry(when, timer));
249+
assert(result.second); (void)result;
250+
}
251+
{
252+
std::pair<ActiveTimerSet::iterator, bool> result
253+
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
254+
assert(result.second); (void)result;
255+
}
256+
257+
assert(timers_.size() == activeTimers_.size());
204258
return earliestChanged;
205259
}
206260

muduo/net/TimerQueue.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,18 @@ class TimerQueue : boost::noncopyable
4949
Timestamp when,
5050
double interval);
5151

52-
// void cancel(TimerId timerId);
52+
void cancel(TimerId timerId);
5353

5454
private:
5555

5656
// FIXME: use unique_ptr<Timer> instead of raw pointers.
5757
typedef std::pair<Timestamp, Timer*> Entry;
5858
typedef std::set<Entry> TimerList;
59+
typedef std::pair<Timer*, int64_t> ActiveTimer;
60+
typedef std::set<ActiveTimer> ActiveTimerSet;
5961

6062
void scheduleInLoop(Timer* timer);
63+
void cancelInLoop(TimerId timerId);
6164
// called when timerfd arms
6265
void handleRead();
6366
// move out all expired timers
@@ -71,6 +74,11 @@ class TimerQueue : boost::noncopyable
7174
Channel timerfdChannel_;
7275
// Timer list sorted by expiration
7376
TimerList timers_;
77+
78+
// for cancel()
79+
ActiveTimerSet activeTimers_;
80+
bool callingExpiredTimers_; /* atomic */
81+
ActiveTimerSet cancelingTimers_;
7482
};
7583

7684
}

muduo/net/tests/TimerQueue_unittest.cc

+11-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ void print(const char* msg)
2828
}
2929
}
3030

31+
void cancel(TimerId timer)
32+
{
33+
g_loop->cancel(timer);
34+
printf("cancelled at %s\n", Timestamp::now().toString().c_str());
35+
}
36+
3137
int main()
3238
{
3339
printTid();
@@ -41,8 +47,12 @@ int main()
4147
loop.runAfter(1.5, boost::bind(print, "once1.5"));
4248
loop.runAfter(2.5, boost::bind(print, "once2.5"));
4349
loop.runAfter(3.5, boost::bind(print, "once3.5"));
50+
TimerId t45 = loop.runAfter(4.5, boost::bind(print, "once4.5"));
51+
loop.runAfter(4.2, boost::bind(cancel, t45));
52+
loop.runAfter(4.8, boost::bind(cancel, t45));
4453
loop.runEvery(2, boost::bind(print, "every2"));
45-
loop.runEvery(3, boost::bind(print, "every3"));
54+
TimerId t3 = loop.runEvery(3, boost::bind(print, "every3"));
55+
loop.runAfter(9.001, boost::bind(cancel, t3));
4656

4757
loop.loop();
4858
print("main loop exits");

0 commit comments

Comments
 (0)