forked from asavine/CompFinance
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ConcurrentQueue.h
101 lines (80 loc) · 1.65 KB
/
ConcurrentQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#pragma once
// Concurrent queue of chapter 3,
// Used in the thread pool
#include <queue>
#include <mutex>
using namespace std;
template <class T>
class ConcurrentQueue
{
queue<T> myQueue;
mutable mutex myMutex;
condition_variable myCV;
bool myInterrupt;
public:
ConcurrentQueue() : myInterrupt(false) {}
~ConcurrentQueue() { interrupt(); }
bool empty() const
{
// Lock
lock_guard<mutex> lk(myMutex);
// Access underlying queue
return myQueue.empty();
} // Unlock
// Pop into argument
bool tryPop(T& t)
{
// Lock
lock_guard<mutex> lk(myMutex);
if (myQueue.empty()) return false;
// Move from queue
t = move(myQueue.front());
// Combine front/pop
myQueue.pop();
return true;
} // Unlock
// Pass t byVal or move with push( move( t))
void push(T t)
{
{
// Lock
lock_guard<mutex> lk(myMutex);
// Move into queue
myQueue.push(move(t));
} // Unlock before notification
// Unlock before notification
myCV.notify_one();
}
// Wait if empty
bool pop(T& t)
{
// (Unique) lock
unique_lock<mutex> lk(myMutex);
// Wait if empty, release lock until notified
while (!myInterrupt && myQueue.empty()) myCV.wait(lk);
// Re-acquire lock, resume
// Check for interruption
if (myInterrupt) return false;
// Combine front/pop
t = move(myQueue.front());
myQueue.pop();
return true;
} // Unlock
void interrupt()
{
{
lock_guard<mutex> lk(myMutex);
myInterrupt = true;
}
myCV.notify_all();
}
void resetInterrupt()
{
myInterrupt = false;
}
void clear()
{
queue<T> empty;
swap(myQueue, empty);
}
};