Skip to content

Commit

Permalink
fix: ack timeout in pulsar cpp client when subscribing to regex topic (
Browse files Browse the repository at this point in the history
…apache#3879)

* fix bug involving ack timeout in  pulsar cpp client when subscribing to regex topic

* remove newline

* fix indent

* add test

* addressing comments

* fix formatting

* fix formatting
  • Loading branch information
jerrypeng authored and merlimat committed Mar 24, 2019
1 parent dfc0e0e commit 16e3b12
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
}
messages_.push(msg);
if (messageListener_) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
}
Expand Down
50 changes: 49 additions & 1 deletion pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#include <lib/PatternMultiTopicsConsumerImpl.h>
#include "lib/Future.h"
#include "lib/Utils.h"

#include <functional>
#include <thread>
#include <chrono>

DECLARE_LOG_OBJECT()

Expand Down Expand Up @@ -2940,3 +2941,50 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
// consumer C should be a different instance from A and B and should be with open state.
ASSERT_EQ(ResultOk, consumerC.close());
}

static long regexTestMessagesReceived = 0;

static void regexMessageListenerFunction(Consumer consumer, const Message &msg) {
regexTestMessagesReceived++;
}

TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
ClientConfiguration config;
Client client(lookupUrl);
long unAckedMessagesTimeoutMs = 10000;
std::string subsName = "testRegexTopicsWithMessageListener-sub";
std::string pattern = "persistent://public/default/testRegexTopicsWithMessageListenerTopic-.*";
ConsumerConfiguration consumerConf;
consumerConf.setConsumerType(ConsumerShared);
consumerConf.setMessageListener(
std::bind(regexMessageListenerFunction, std::placeholders::_1, std::placeholders::_2));
consumerConf.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);

Producer producer;
ProducerConfiguration producerConf;
Result result = client.createProducer(
"persistent://public/default/testRegexTopicsWithMessageListenerTopic-1", producerConf, producer);
ASSERT_EQ(ResultOk, result);

Consumer consumer;
result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(consumer.getSubscriptionName(), subsName);

for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
producer.sendAsync(msg, nullptr);
}

producer.flush();
long timeWaited = 0;
while (true) {
// maximum wait time
ASSERT_LE(timeWaited, unAckedMessagesTimeoutMs * 3);
if (regexTestMessagesReceived >= 10 * 2) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}
}

0 comments on commit 16e3b12

Please sign in to comment.