Skip to content

Commit

Permalink
[C++] Correct API subscribe*'s argument name (apache#9037)
Browse files Browse the repository at this point in the history
  • Loading branch information
saosir authored Dec 23, 2020
1 parent d3e39a7 commit 99926b4
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 37 deletions.
16 changes: 8 additions & 8 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ class PULSAR_PUBLIC Client {
void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback);

Result subscribe(const std::string& topic, const std::string& consumerName, Consumer& consumer);
Result subscribe(const std::string& topic, const std::string& consumerName,
Result subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer);
Result subscribe(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

void subscribeAsync(const std::string& topic, const std::string& consumerName,
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
SubscribeCallback callback);
void subscribeAsync(const std::string& topic, const std::string& consumerName,
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
Expand All @@ -115,14 +115,14 @@ class PULSAR_PUBLIC Client {
/**
* subscribe for multiple topics, which match given regexPattern, under the same namespace.
*/
Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName,
Result subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
Consumer& consumer);
Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName,
Result subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
SubscribeCallback callback);
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
Expand Down
16 changes: 8 additions & 8 deletions pulsar-client-cpp/lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,28 @@ void Client::createProducerAsync(const std::string& topic, ProducerConfiguration
impl_->createProducerAsync(topic, conf, callback);
}

Result Client::subscribe(const std::string& topic, const std::string& consumerName, Consumer& consumer) {
return subscribe(topic, consumerName, ConsumerConfiguration(), consumer);
Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) {
return subscribe(topic, subscriptionName, ConsumerConfiguration(), consumer);
}

Result Client::subscribe(const std::string& topic, const std::string& consumerName,
Result Client::subscribe(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer) {
Promise<Result, Consumer> promise;
subscribeAsync(topic, consumerName, conf, WaitForCallbackValue<Consumer>(promise));
subscribeAsync(topic, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise));
Future<Result, Consumer> future = promise.getFuture();

return future.get(consumer);
}

void Client::subscribeAsync(const std::string& topic, const std::string& consumerName,
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
SubscribeCallback callback) {
subscribeAsync(topic, consumerName, ConsumerConfiguration(), callback);
subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeAsync(const std::string& topic, const std::string& consumerName,
void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
LOG_INFO("Subscribing on Topic :" << topic);
impl_->subscribeAsync(topic, consumerName, conf, callback);
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
}

Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
Expand Down
26 changes: 13 additions & 13 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
consumers_.push_back(reader->getConsumer());
}

void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);

Expand All @@ -252,12 +252,12 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const

lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, regexPattern, consumerName, conf, callback));
std::placeholders::_2, regexPattern, subscriptionName, conf, callback));
}

void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
const std::string& regexPattern,
const std::string& consumerName,
const std::string& subscriptionName,
const ConsumerConfiguration& conf,
SubscribeCallback callback) {
if (result == ResultOk) {
Expand All @@ -269,7 +269,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern);

consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(
shared_from_this(), regexPattern, *matchTopics, consumerName, conf, lookupServicePtr_);
shared_from_this(), regexPattern, *matchTopics, subscriptionName, conf, lookupServicePtr_);

consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
Expand All @@ -284,7 +284,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam
}
}

void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName,
void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicNamePtr;

Expand All @@ -309,7 +309,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
}

ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topics, consumerName, topicNamePtr, conf, lookupServicePtr_);
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_);

consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
Expand All @@ -319,7 +319,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st
consumer->start();
}

void ClientImpl::subscribeAsync(const std::string& topic, const std::string& consumerName,
void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicName;
{
Expand All @@ -343,11 +343,11 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con

lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, consumerName, conf, callback));
std::placeholders::_2, topicName, subscriptionName, conf, callback));
}

void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, const std::string& consumerName,
TopicNamePtr topicName, const std::string& subscriptionName,
ConsumerConfiguration conf, SubscribeCallback callback) {
if (result == ResultOk) {
// generate random name if not supplied by the customer.
Expand All @@ -361,11 +361,11 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
callback(ResultInvalidConfiguration, Consumer());
return;
}
consumer = std::make_shared<PartitionedConsumerImpl>(shared_from_this(), consumerName, topicName,
partitionMetadata->getPartitions(), conf);
consumer = std::make_shared<PartitionedConsumerImpl>(
shared_from_this(), subscriptionName, topicName, partitionMetadata->getPartitions(), conf);
} else {
auto consumerImpl =
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(), consumerName, conf);
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
subscriptionName, conf);
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
consumer = consumerImpl;
}
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback);

void subscribeAsync(const std::string& topic, const std::string& consumerName,
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

void subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName,
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
Expand Down
8 changes: 4 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ namespace pulsar {
DECLARE_LOG_OBJECT()

ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const std::string& subscription, const ConsumerConfiguration& conf,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
waitingForZeroQueueSizeMessage(false),
config_(conf),
subscription_(subscription),
originalSubscriptionName_(subscription),
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
messageListener_(config_.getMessageListener()),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
Expand All @@ -58,7 +58,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
partitionIndex_(-1),
consumerCreatedPromise_(),
messageListenerRunning_(true),
batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
batchAcknowledgementTracker_(topic_, subscriptionName, (long)consumerId_),
brokerConsumerStats_(),
consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ConsumerImpl : public ConsumerImplBase,
public HandlerBase,
public std::enable_shared_from_this<ConsumerImpl> {
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscription,
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration&,
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(),
const ConsumerTopicType consumerTopicType = NonPartitioned,
Expand Down

0 comments on commit 99926b4

Please sign in to comment.