Skip to content

Commit

Permalink
cpp: fix race condition caused by consumer seek and close (apache#7819)
Browse files Browse the repository at this point in the history
## Motivation
User try a loop of create-exclusive-consumer, seek, consume and close of a consumer with cpp client, and some times will meet “consumer busy” errors, which means the broker side consumer still alive while creating new a consumer. 

Here are suspicion logs.  
```
INFO:Client(88)Subscribing on Topic :public/default/jeff-test-21-partition-0Tue Aug 11 11:38:38 2020
INFO:HandlerBase(53)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Getting connection from poolTue Aug 11 11:38:38 2020
INFO:ConsumerImpl(175)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Created consumer on broker [10.88.109.77:44648 -> 10.88.109.71:31051] Tue Aug 11 11:38:38 2020
INFO:HandlerBase(130)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Schedule reconnection in 0.1 sTue Aug 11 11:38:38 2020
INFO:ConsumerImpl(1047)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Seek successfullyTue Aug 11 11:38:38 2020
INFO:HandlerBase(53)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Getting connection from poolTue Aug 11 11:38:38 2020
INFO:ConsumerImpl(175)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Created consumer on broker [10.88.109.77:44648 -> 10.88.109.71:31051] Tue Aug 11 11:38:40 2020
INFO:HandlerBase(130)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Schedule reconnection in 0.1 sTue Aug 11 11:38:41 2020
INFO:HandlerBase(53)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Getting connection from poolTue Aug 11 11:38:41 2020
INFO:ConsumerImpl(848)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Closing consumer for topic persistent://public/default/jeff-test-21-partition-0Tue Aug 11 11:38:42 2020
INFO:ConsumerImpl(175)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Created consumer on broker [10.88.109.77:44646 -> 10.88.109.71:31051] Tue Aug 11 11:38:42 2020
INFO:ConsumerImpl(104)[persistent://public/default/jeff-test-21-partition-0, 0_itom_di_scheduler_bat_prod_jeff-test-21, 1612] Destroyed consumer which was not properly closedTue Aug 11 11:38:42 2020
```

-  there is reconnection caused by seek command;
-  close operation happens at the same time of seek.
-  consumer destroyed with log `Destroyed consumer which was not properly closed`.

The race condition happens like this:
1. seek command triggered consumer disconnect; 
```
subscription.resetCursor
\
disconnectFuture = dispatcher.disconnectActiveConsumers(true);
\
disconnectAllConsumers(boolean isResetCursor)
consumerList.forEach(consumer -> consumer.disconnect(isResetCursor)); 
```

2. client trigger disconnectConsumer, and triggered `connection_.reset();`

```
case BaseCommand::CLOSE_CONSUMER: {
    		consumer->disconnectConsumer();  LOG_INFO("Broker notification of Closed consumer: " << consumerId_);	   

\
void ConsumerImpl::disconnectConsumer() {
    LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
    Lock lock(mutex_);
    connection_.reset();   < === 
    lock.unlock();
}
```

3. connection not ready, and close consumer happened, then it leaked send CloseConsumer command to broker.

```
void ConsumerImpl::closeAsync(ResultCallback callback) {
...
    LOG_INFO(getName() << "Closing consumer for topic " << topic_);
    state_ = Closing;
    ClientConnectionPtr cnx = getCnx().lock();    < === the seek operation caused cnx reset
    if (!cnx) {     < === goes into this if, and set to Closed and returned directly without closeConsumer sent to broker
        state_ = Closed;  
        lock.unlock();
        // If connection is gone, also the consumer is closed on the broker side
        if (callback) {
            callback(ResultOk);
        }
        return;
    }
    ... 
    Future<Result, ResponseData> future =
        cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requested);  < ====
   .... 
}
```


### Modifications

when consumer destroy, try to send another closeConsumer command if suitable.


* fix race condition caused by consumer seek and close

* fix format
  • Loading branch information
jiazhai authored Aug 21, 2020
1 parent 54dd24c commit dcc84c9
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,20 @@ ConsumerImpl::~ConsumerImpl() {
LOG_DEBUG(getName() << "~ConsumerImpl");
incomingMessages_.clear();
if (state_ == Ready) {
// this could happen at least in this condition:
// consumer seek, caused reconnection, if consumer close happened before connection ready,
// then consumer will not send closeConsumer to Broker side, and caused a leak of consumer in
// broker.
LOG_WARN(getName() << "Destroyed consumer which was not properly closed");

ClientConnectionPtr cnx = getCnx().lock();
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
if (cnx) {
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
cnx->removeConsumer(consumerId_);
LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_);
}
}
}

Expand Down

0 comments on commit dcc84c9

Please sign in to comment.