-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathfuturequeue.h
156 lines (138 loc) · 5.09 KB
/
futurequeue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2016-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/
/*
* The FutureQueue provides a std::priority_queue style interface
* onto a queue of ExTask objects that are sorted by the tasks wakeTime.
* The lowest wakeTime (soonest) will be the top() task.
*
* FutureQueue provides methods that allow a task's wakeTime to be mutated
* whilst maintaining the priority ordering.
*/
#pragma once
#include <algorithm>
#include <chrono>
#include <mutex>
#include <queue>
#include "globaltask.h"
template <class C = std::deque<ExTask>, class Compare = CompareByDueDate>
class FutureQueue {
public:
void push(ExTask task) {
std::lock_guard<std::mutex> lock(queueMutex);
queue.push(task);
}
void pop() {
std::lock_guard<std::mutex> lock(queueMutex);
queue.pop();
}
ExTask top() {
std::lock_guard<std::mutex> lock(queueMutex);
return queue.top();
}
size_t size() {
std::lock_guard<std::mutex> lock(queueMutex);
return queue.size();
}
bool empty() {
std::lock_guard<std::mutex> lock(queueMutex);
return queue.empty();
}
/*
* Update the wakeTime of task and ensure the heap property is
* maintained.
* @returns true if 'task' is in the FutureQueue.
*/
bool updateWaketime(const ExTask& task,
cb::time::steady_clock::time_point newTime) {
std::lock_guard<std::mutex> lock(queueMutex);
task->updateWaketime(newTime);
// After modifiying the task's wakeTime, rebuild the heap
return queue.heapify(task);
}
/*
* snooze the task (by altering its wakeTime) and ensure the
* heap property is maintained.
* @returns true if 'task' is in the FutureQueue.
*/
bool snooze(const ExTask& task, const double secs) {
std::lock_guard<std::mutex> lock(queueMutex);
task->snooze(secs);
// After modifiying the task's wakeTime, rebuild the heap
return queue.heapify(task);
}
/**
* Checks that the invariants of the future queue are valid.
* If not then throws std::logic_error.
*/
void assertInvariants() {
return queue.verifyHeapProperty();
}
protected:
/*
* HeapifiableQueue exposes a method to maintain the heap ordering
* of the underlying queue.
*
* This class is deliberately hidden inside FutureQueue so that any
* extensions made to priority_queue can't be accessed without work.
* I.e. correct locking and any need to 'heapify'.
*/
class HeapifiableQueue : public std::priority_queue<ExTask, C, Compare> {
public:
/*
* Ensure the heap property is maintained
* @returns true if 'task' is in the queue and heapify() did something.
*/
bool heapify(const ExTask& task) {
// if the task exists, rebuild
if (exists(task)) {
if (this->c.back()->getId() == task->getId()) {
std::push_heap(this->c.begin(), this->c.end(), this->comp);
} else {
std::make_heap(this->c.begin(), this->c.end(), this->comp);
}
return true;
} else {
return false;
}
}
void verifyHeapProperty() {
auto heap_end = std::is_heap_until(
this->c.begin(), this->c.end(), this->comp);
if (heap_end != this->c.end()) {
std::string msg;
msg += "FutureQueue::verifyHeapProperty() - heap invariant "
"broken. First non-heap is task:" +
(*heap_end)->getDescription() + " wake:" +
std::to_string(
to_ns_since_epoch((*heap_end)->getWaketime())
.count()) +
"\nAll items:\n";
for (auto& task : this->c) {
msg += "\t task:" + task->getDescription() + " wake:" +
std::to_string(to_ns_since_epoch(task->getWaketime())
.count()) +
"\n";
}
throw std::logic_error(msg);
}
}
protected:
bool exists(const ExTask& task) {
return std::find_if(this->c.begin(),
this->c.end(),
[&task](const ExTask& qTask) {
return task->getId() == qTask->getId();
}) != this->c.end();
}
} queue;
// All access to queue must be done with the queueMutex
std::mutex queueMutex;
};