-
Notifications
You must be signed in to change notification settings - Fork 215
/
Copy pathtest_utils_impl.h
102 lines (92 loc) · 3.14 KB
/
test_utils_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include <mutex>
#include <chrono>
#include <condition_variable>
#include "cppkafka/utils/consumer_dispatcher.h"
using std::vector;
using std::move;
using std::thread;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
using std::condition_variable;
using std::chrono::system_clock;
using std::chrono::milliseconds;
using std::chrono::seconds;
using cppkafka::Consumer;
using cppkafka::BasicConsumerDispatcher;
using cppkafka::Message;
using cppkafka::TopicPartition;
//==================================================================================
// BasicConsumerRunner
//==================================================================================
template <typename ConsumerType>
BasicConsumerRunner<ConsumerType>::BasicConsumerRunner(ConsumerType& consumer,
size_t expected,
size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
BasicConsumerDispatcher<ConsumerType> dispatcher(consumer_);
dispatcher.run(
// Message callback
[&](Message msg) {
if (number_eofs == partitions) {
messages_.push_back(move(msg));
}
},
// EOF callback
[&](typename BasicConsumerDispatcher<ConsumerType>::EndOfFile,
const TopicPartition& topic_partition) {
if (number_eofs != partitions) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
},
// Every time there's any event callback
[&](typename BasicConsumerDispatcher<ConsumerType>::Event) {
if (expected > 0 && messages_.size() == expected) {
dispatcher.stop();
}
if (expected == 0 && number_eofs >= partitions) {
dispatcher.stop();
}
if (system_clock::now() - start >= seconds(20)) {
dispatcher.stop();
}
}
);
// dispatcher has stopped
if (number_eofs < partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
template <typename ConsumerType>
BasicConsumerRunner<ConsumerType>::~BasicConsumerRunner() {
try_join();
}
template <typename ConsumerType>
const std::vector<Message>& BasicConsumerRunner<ConsumerType>::get_messages() const {
return messages_;
}
template <typename ConsumerType>
void BasicConsumerRunner<ConsumerType>::try_join() {
if (thread_.joinable()) {
thread_.join();
}
}