Skip to content

Commit

Permalink
[Issue 8787][C++] Add reader internal subscription name setter. (apac…
Browse files Browse the repository at this point in the history
…he#8823)

Master Issue: apache#8787 

### Motivation

Currently, the reader subscription name can only be generated internally randomly in the C++ client.
Java client part is at apache#8801 

### Modifications

Add a setter for the reader's internal subscription name.

### Verifying this change

This change is already covered by existing tests, such as *testSubscriptionNameSetting*, *testSetSubscriptionNameAndPrefix* and *testMultiSameSubscriptionNameReaderShouldFail*.
  • Loading branch information
RobertIndie authored Dec 8, 2020
1 parent 652dc1e commit 408f9e6
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 3 deletions.
7 changes: 7 additions & 0 deletions pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ class PULSAR_PUBLIC ReaderConfiguration {
void setReadCompacted(bool compacted);
bool isReadCompacted() const;

/**
* Set the internal subscription name.
* @param internal subscriptionName
*/
void setInternalSubscriptionName(std::string internalSubscriptionName);
const std::string& getInternalSubscriptionName() const;

private:
std::shared_ptr<ReaderConfigurationImpl> impl_;
};
Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/ReaderConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,13 @@ void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscript
bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted; }

void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }

void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
impl_->internalSubscriptionName = internalSubscriptionName;
}

const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
return impl_->internalSubscriptionName;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ReaderConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct ReaderConfigurationImpl {
std::string readerName;
std::string subscriptionRolePrefix;
bool readCompacted;
std::string internalSubscriptionName;
ReaderConfigurationImpl()
: schemaInfo(),
hasReaderListener(false),
Expand Down
11 changes: 8 additions & 3 deletions pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ void ReaderImpl::start(const MessageId& startMessageId) {
std::placeholders::_1, std::placeholders::_2));
}

std::string subscription = "reader-" + generateRandomName();
if (!readerConf_.getSubscriptionRolePrefix().empty()) {
subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
std::string subscription;
if (!readerConf_.getInternalSubscriptionName().empty()) {
subscription = readerConf_.getInternalSubscriptionName();
} else {
subscription = "reader-" + generateRandomName();
if (!readerConf_.getSubscriptionRolePrefix().empty()) {
subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
}
}

consumer_ = std::make_shared<ConsumerImpl>(
Expand Down
57 changes: 57 additions & 0 deletions pulsar-client-cpp/tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,60 @@ TEST(ReaderTest, testPartitionIndex) {

client.close();
}

TEST(ReaderTest, testSubscriptionNameSetting) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/test-subscription-name-setting";
std::string subName = "test-sub";

ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());

reader.close();
client.close();
}

TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
std::string subName = "test-sub";

ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
readerConf.setSubscriptionRolePrefix("my-prefix");
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());

reader.close();
client.close();
}

TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
std::string subscriptionName = "test-sub";

ReaderConfiguration readerConf1;
readerConf1.setInternalSubscriptionName(subscriptionName);
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));

ReaderConfiguration readerConf2;
readerConf2.setInternalSubscriptionName(subscriptionName);
Reader reader2;
ASSERT_EQ(ResultConsumerBusy,
client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));

reader1.close();
reader2.close();
client.close();
}

0 comments on commit 408f9e6

Please sign in to comment.