Skip to content

Commit

Permalink
CPP Client - Accessing connection_ pointer under a lock (apache#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored and merlimat committed Jul 10, 2017
1 parent 9a8098f commit 365230c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
firstTime = false;
}
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
connection_ = cnx;
{
Lock lock(mutex_);
connection_ = cnx;
incomingMessages_.clear();
cnx->registerConsumer(consumerId_, shared_from_this());
state_ = Ready;
Expand Down Expand Up @@ -581,7 +581,9 @@ void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::Command

void ConsumerImpl::disconnectConsumer() {
LOG_DEBUG("Broker notification of Closed consumer: " << consumerId_);
Lock lock(mutex_);
connection_.reset();
lock.unlock();
scheduleReconnection(shared_from_this());
}

Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ void HandlerBase::start() {
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
if (connection_.lock()) {
lock.unlock();
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
return;
}

lock.unlock();
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,10 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {

void ProducerImpl::disconnectProducer() {
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
Lock lock(mutex_);
connection_.reset();
lock.unlock();
scheduleReconnection(shared_from_this());

}

const std::string& ProducerImpl::getName() const{
Expand Down

0 comments on commit 365230c

Please sign in to comment.