Skip to content

Commit

Permalink
[C++] Fix Consumer send redeliverMessages repeatedly (apache#9072)
Browse files Browse the repository at this point in the history
Fixes apache#9028

### Motivation
Both PartitionedConsumerImpl and ConsumerImpl have member variable unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and PartitionedConsumerImpl is composed of ConsumerImpl. If the acknowledgement times out, they will send redeliverMessages repeatedly, MultiTopicsConsumerImpl has same problem.

### Modifications

- add `hasParent_` field to whether there is a parent in ConsumerImpl
- add unit test verify the redelivery request won't be sent repeatedly
- add GTest header `gtest/gtest_prod.h` to access private members in unit tests
- fix typo
  • Loading branch information
saosir authored Jan 6, 2021
1 parent 984bf83 commit 9894b99
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 25 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>logs/**</exclude>
<exclude>**/circe/**</exclude>
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
<exclude>pulsar-client-cpp/include/gtest/gtest_prod.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ lib*.so*
.settings/
.pydevproject
.idea/
.vs/
*.cbp
*.ninja*
.clangd/
Expand Down
60 changes: 60 additions & 0 deletions pulsar-client-cpp/include/gtest/gtest_prod.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2006, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//
// Google C++ Testing and Mocking Framework definitions useful in production code.
// GOOGLETEST_CM0003 DO NOT DELETE

#ifndef GTEST_INCLUDE_GTEST_GTEST_PROD_H_
#define GTEST_INCLUDE_GTEST_GTEST_PROD_H_

// When you need to test the private or protected members of a class,
// use the FRIEND_TEST macro to declare your tests as friends of the
// class. For example:
//
// class MyClass {
// private:
// void PrivateMethod();
// FRIEND_TEST(MyClassTest, PrivateMethodWorks);
// };
//
// class MyClassTest : public testing::Test {
// // ...
// };
//
// TEST_F(MyClassTest, PrivateMethodWorks) {
// // Can call MyClass::PrivateMethod() here.
// }
//
// Note: The test class must be in the same namespace as the class being tested.
// For example, putting MyClassTest in an anonymous namespace will not work.

#define FRIEND_TEST(test_case_name, test_name) friend class test_case_name##_##test_name##_Test

#endif // GTEST_INCLUDE_GTEST_GTEST_PROD_H_
15 changes: 11 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
Expand All @@ -46,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
messageListener_(config_.getMessageListener()),
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId),
Expand Down Expand Up @@ -563,7 +565,6 @@ void ConsumerImpl::internalListener() {
// This will only happen when the connection got reset and we cleared the queue
return;
}
unAckedMessageTrackerPtr_->add(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
Expand Down Expand Up @@ -638,7 +639,6 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
callback(ResultOk, msg);
} else {
pendingReceives_.push(callback);
Expand Down Expand Up @@ -672,7 +672,6 @@ Result ConsumerImpl::receiveHelper(Message& msg) {

incomingMessages_.pop(msg);
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
}

Expand Down Expand Up @@ -702,7 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {

if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
return ResultTimeout;
Expand All @@ -720,6 +718,7 @@ void ConsumerImpl::messageProcessed(Message& msg) {
}

increaseAvailablePermits(currentCnx);
trackMessage(msg);
}

/**
Expand Down Expand Up @@ -1232,4 +1231,12 @@ void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
negativeAcksTracker_.setEnabledForTesting(enabled);
}

void ConsumerImpl::trackMessage(const Message& msg) {
if (hasParent_) {
unAckedMessageTrackerPtr_->remove(msg.getMessageId());
} else {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
}

} /* namespace pulsar */
9 changes: 7 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConsumerImpl : public ConsumerImplBase,
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration&,
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(),
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
Optional<MessageId> startMessageId = Optional<MessageId>::empty());
Expand Down Expand Up @@ -166,6 +166,7 @@ class ConsumerImpl : public ConsumerImplBase,
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
void trackMessage(const Message& msg);

Optional<MessageId> clearReceiveQueue();

Expand All @@ -175,6 +176,7 @@ class ConsumerImpl : public ConsumerImplBase,
std::string originalSubscriptionName_;
MessageListener messageListener_;
ExecutorServicePtr listenerExecutor_;
bool hasParent_;
ConsumerTopicType consumerTopicType_;

Commands::SubscriptionMode subscriptionMode_;
Expand All @@ -192,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase,
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
Expand All @@ -218,6 +220,9 @@ class ConsumerImpl : public ConsumerImplBase,
// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
friend class PartitionedConsumerImpl;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};

} /* namespace pulsar */
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
internalListenerExecutor, NonPartitioned);
internalListenerExecutor, true, NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand All @@ -193,7 +193,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
internalListenerExecutor, Partitioned);
internalListenerExecutor, true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include "BlockingQueue.h"
Expand Down Expand Up @@ -103,7 +104,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
std::queue<ReceiveCallback> pendingReceives_;

Expand Down Expand Up @@ -137,7 +138,10 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partit

std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
internalListenerExecutor_, Partitioned);
internalListenerExecutor_, true, Partitioned);

const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
consumer->getConsumerCreatedFuture().addListener(std::bind(
Expand Down
7 changes: 6 additions & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef PULSAR_PARTITIONED_CONSUMER_HEADER
#define PULSAR_PARTITIONED_CONSUMER_HEADER
#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include <vector>
Expand Down Expand Up @@ -118,11 +119,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
std::queue<ReceiveCallback> pendingReceives_;
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult);

friend class PulsarFriend;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
};
typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
}

consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
#define LIB_UNACKEDMESSAGETRACKERENABLED_H_
#include "gtest/gtest_prod.h"
#include "lib/UnAckedMessageTrackerInterface.h"

#include <mutex>
Expand Down Expand Up @@ -48,6 +49,9 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
ClientImplPtr client_;
long timeoutMs_;
long tickDurationInMs_;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};
} // namespace pulsar

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ class UnAckedMessageTrackerInterface {
virtual void removeTopicMessage(const std::string& topic) = 0;
};

typedef std::unique_ptr<UnAckedMessageTrackerInterface> UnAckedMessageTrackerScopedPtr;
using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
} // namespace pulsar
#endif /* LIB_UNACKEDMESSAGETRACKERINTERFACE_H_ */
5 changes: 3 additions & 2 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3825,7 +3825,7 @@ class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
TEST(BasicEndToEndTest, testUnAckedMessageTrackerDefaultBehavior) {
ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
Expand Down Expand Up @@ -4002,7 +4002,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(4));
ASSERT_EQ(0, tracker->size());
ASSERT_TRUE(tracker->isEmpty());
consumer.close();
Expand All @@ -4012,6 +4012,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
}
Message msg;
auto ret = consumer.receive(msg, 1000);
Expand Down
Loading

0 comments on commit 9894b99

Please sign in to comment.