Skip to content

Commit

Permalink
[C++] Fix cpp client do AcknowledgeCumulative not clean up previous m…
Browse files Browse the repository at this point in the history
…essage (apache#8606)

### Motivation
pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in  `UnAckedMessageTrackerEnabled::removeMessagesTill`

### Modifications

- When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker
- add unit test for `UnAckedMessageTrackerEnabled`
  • Loading branch information
saosir authored Dec 30, 2020
1 parent 92fc224 commit e75de48
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 13 deletions.
24 changes: 12 additions & 12 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
messageIdPartitionMap.erase(exist);
}
return removed;
}
Expand All @@ -124,28 +125,27 @@ long UnAckedMessageTrackerEnabled::size() {

void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
if (msgIdInMap <= msgId) {
it->second.erase(msgIdInMap);
messageIdPartitionMap.erase(it++);
} else {
it++;
}
}
}

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
it->second.erase(msgIdInMap);
messageIdPartitionMap.erase(it++);
} else {
it++;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {

void clear();

private:
protected:
void timeoutHandlerHelper();
bool isEmpty();
long size();
Expand Down
205 changes: 205 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3828,3 +3828,208 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
}

class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
public:
UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr client, ConsumerImplBase &consumer)
: UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer) {}
const long getUnAckedMessagesTimeoutMs() { return this->timeoutMs_; }
const long getTickDurationInMs() { return this->tickDurationInMs_; }
bool isEmpty() { return UnAckedMessageTrackerEnabled::isEmpty(); }
long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);

UnAckedMessageTrackerDisabled tracker;
Message msg;
ASSERT_FALSE(tracker.add(msg.getMessageId()));
ASSERT_FALSE(tracker.remove(msg.getMessageId()));
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerDisabled) {
constexpr auto numMsg = 10;
const std::string topicName =
"testUnAckedMessageTrackerDisabledIndividualAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-disabled-ind-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);

Producer producer;
ProducerConfiguration configProducer;
configProducer.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, configProducer, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

UnAckedMessageTrackerDisabled tracker;
for (auto count = 0; count < numMsg; ++count) {
Message msg;

ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_FALSE(tracker.add(msg.getMessageId()));

consumer.acknowledge(msg.getMessageId());
ASSERT_FALSE(tracker.remove(msg.getMessageId()));
}
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledIndividualAck) {
constexpr auto numMsg = 10;
constexpr auto unAckedMessagesTimeoutMs = 1000;
const std::string topicName =
"testUnAckedMessageTrackerEnabledIndividualAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-enabled-ind-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}

auto tracker0 = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
clientImplPtr, consumerImpl0);
ASSERT_EQ(tracker0->getUnAckedMessagesTimeoutMs(), unAckedMessagesTimeoutMs);
ASSERT_EQ(tracker0->getTickDurationInMs(), unAckedMessagesTimeoutMs);

for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker0->add(recvMsgId[idx]));
}
ASSERT_EQ(numMsg, tracker0->size());
ASSERT_FALSE(tracker0->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(4));
ASSERT_EQ(0, tracker0->size());
ASSERT_TRUE(tracker0->isEmpty());
consumer.close();

ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_EQ(restMsgId.count(msg.getMessageId()), 1);
ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
}

auto tracker1 = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
clientImplPtr, consumerImpl1);
for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker1->add(recvMsgId[idx]));
ASSERT_TRUE(tracker1->remove(recvMsgId[idx]));
}
ASSERT_EQ(0, tracker1->size());
ASSERT_TRUE(tracker1->isEmpty());
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
constexpr auto numMsg = 10;
constexpr auto unAckedMessagesTimeoutMs = 1000;
const std::string topicName =
"testUnAckedMessageTrackerEnabledCumulativeAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-enabled-cum-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
auto tracker = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs, clientImplPtr,
consumerImpl0);
for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker->add(recvMsgId[idx]));
}
ASSERT_EQ(numMsg, tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::sort(recvMsgId.begin(), recvMsgId.end());

auto targetMsgId = recvMsgId[numMsg / 2];
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(targetMsgId));
tracker->removeMessagesTill(targetMsgId);
ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, tracker->size());
ASSERT_TRUE(tracker->isEmpty());
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
}
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
consumer.close();
client.close();
}

0 comments on commit e75de48

Please sign in to comment.