Skip to content

Commit

Permalink
Fix partition index error in close callback (apache#7282)
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored Jun 17, 2020
1 parent df3668b commit 72285f2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
14 changes: 7 additions & 7 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
partitionedConsumerCreatedPromise_.setFailed(result);
// unsubscribed all of the successfully subscribed partitioned consumers
closeAsync(nullCallbackForCleanup);
LOG_DEBUG("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
return;
}

Expand Down Expand Up @@ -351,17 +351,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
setState(Closed);
int consumerIndex = 0;
unsigned int consumerAlreadyClosed = 0;
// close successfully subscribed consumers
// Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
// when `state_` is Ready
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
ConsumerImplPtr consumer = *i;
for (auto& consumer : consumers_) {
if (!consumer->isClosed()) {
consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose,
shared_from_this(), std::placeholders::_1, consumerIndex,
callback));
auto self = shared_from_this();
const auto partition = consumer->getPartitionIndex();
consumer->closeAsync([this, self, partition, callback](Result result) {
handleSinglePartitionConsumerClose(result, partition, callback);
});
} else {
if (++consumerAlreadyClosed == consumers_.size()) {
// everything is closed already. so we are good.
Expand Down
16 changes: 8 additions & 8 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
lock.unlock();
closeAsync(closeCallback);
partitionedProducerCreatedPromise_.setFailed(result);
LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
return;
}

Expand Down Expand Up @@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
setState(Closing);

int producerIndex = 0;
unsigned int producerAlreadyClosed = 0;

// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
ProducerImplPtr prod = *i;
if (!prod->isClosed()) {
prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
shared_from_this(), std::placeholders::_1, producerIndex,
closeCallback));
for (auto& producer : producers_) {
if (!producer->isClosed()) {
auto self = shared_from_this();
const auto partition = static_cast<unsigned int>(producer->partition());
producer->closeAsync([this, self, partition, closeCallback](Result result) {
handleSinglePartitionProducerClose(result, partition, closeCallback);
});
} else {
producerAlreadyClosed++;
}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase,

uint64_t getProducerId() const;

int32_t partition() const noexcept { return partition_; }

virtual void start();

virtual void shutdown();
Expand Down

0 comments on commit 72285f2

Please sign in to comment.