Skip to content

Commit

Permalink
Change state_ to closed when resultOk is returned (apache#5446)
Browse files Browse the repository at this point in the history
  • Loading branch information
hrsakai authored and aahmed-se committed Oct 23, 2019
1 parent e73fc00 commit 492d92b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,12 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}

LOG_INFO(getName() << "Closing consumer for topic " << topic_);
state_ = Closing;

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
lock.unlock();
// If connection is gone, also the consumer is closed on the broker side
if (callback) {
Expand All @@ -847,9 +851,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}

LOG_INFO(getName() << "Closing consumer for topic " << topic_);
ClientImplPtr client = client_.lock();
if (!client) {
state_ = Closed;
lock.unlock();
// Client was already destroyed
if (callback) {
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
lock.unlock();
if (callback) {
callback(ResultOk);
Expand All @@ -502,16 +503,19 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
// Detach the producer from the connection to avoid sending any other
// message from the producer
connection_.reset();
lock.unlock();

ClientImplPtr client = client_.lock();
if (!client) {
state_ = Closed;
lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
}
return;
}

lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
Expand Down

0 comments on commit 492d92b

Please sign in to comment.