-
Notifications
You must be signed in to change notification settings - Fork 215
/
Copy pathroundrobin_poll_test.cpp
141 lines (120 loc) · 4.57 KB
/
roundrobin_poll_test.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <vector>
#include <thread>
#include <set>
#include <mutex>
#include <chrono>
#include <iterator>
#include <condition_variable>
#include <catch.hpp>
#include <memory>
#include <stdexcept>
#include "cppkafka/cppkafka.h"
#include "test_utils.h"
using std::vector;
using std::move;
using std::string;
using std::exception;
using std::thread;
using std::set;
using std::mutex;
using std::tie;
using std::condition_variable;
using std::lock_guard;
using std::unique_lock;
using std::unique_ptr;
using std::make_move_iterator;
using std::chrono::seconds;
using std::chrono::milliseconds;
using std::chrono::system_clock;
using namespace cppkafka;
#define ENABLE_STRICT_RR_ORDER 0
//==================================================================================
// Helper functions
//==================================================================================
static Configuration make_producer_config() {
Configuration config = {
{ "metadata.broker.list", KAFKA_TEST_INSTANCE },
{ "max.in.flight", 1 }
};
return config;
}
static Configuration make_consumer_config(const string& group_id = make_consumer_group_id()) {
Configuration config = {
{ "metadata.broker.list", KAFKA_TEST_INSTANCE },
{ "enable.auto.commit", false },
{ "group.id", group_id },
};
return config;
}
#if ENABLE_STRICT_RR_ORDER
static vector<int> make_roundrobin_partition_vector(int total_messages) {
vector<int> partition_order;
for (int i = 0, partition = 0; i < total_messages+1; ++i) {
if ((i % KAFKA_NUM_PARTITIONS) == 0) {
partition = 0;
}
partition_order.push_back(partition++);
}
return partition_order;
}
#endif
//========================================================================
// TESTS
//========================================================================
TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
TopicPartitionList assignment;
int messages_per_partition = 3;
int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition;
// Create a consumer and subscribe to the topic
PollStrategyAdapter consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPICS[0] });
consumer.add_polling_strategy(unique_ptr<PollInterface>(new RoundRobinPollStrategy(consumer)));
PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
// Produce messages so we stop the consumer
BufferedProducer<string> producer(make_producer_config());
string payload = "RoundRobin";
// push 3 messages in each partition
for (int i = 0; i < total_messages; ++i) {
producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0])
.partition(i % KAFKA_NUM_PARTITIONS)
.payload(payload));
}
producer.flush();
runner.try_join();
// Check that we have all messages
REQUIRE(runner.get_messages().size() == total_messages);
#if ENABLE_STRICT_RR_ORDER
// Check that we have one message from each partition in desired order
vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
int partition_idx;
for (int i = 0; i < total_messages; ++i) {
if (i == 0) {
// find first polled partition index
partition_idx = runner.get_messages()[i].get_partition();
}
CHECK(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
}
//============ resume original poll strategy =============//
//validate that once the round robin strategy is deleted, normal poll works as before
consumer.delete_polling_strategy();
ConsumerRunner serial_runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
payload = "SerialPolling";
// push 3 messages in each partition
for (int i = 0; i < total_messages; ++i) {
producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
}
producer.flush();
serial_runner.try_join();
// Check that we have all messages
REQUIRE(serial_runner.get_messages().size() == total_messages);
for (int i = 0; i < total_messages; ++i) {
REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload);
}
#else
// Simple payload check
for (int i = 0; i < total_messages; ++i) {
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
}
#endif
}