Skip to content

Commit

Permalink
[C++] Handle exception in creating socket when fd limit is reached (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 9, 2022
1 parent a21dc0e commit babae8e
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
serverProtocolVersion_(ProtocolVersion_MIN),
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
#if BOOST_VERSION >= 107000
strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
#elif BOOST_VERSION >= 106600
Expand All @@ -173,12 +172,20 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
clientConfiguration.getConnectionTimeout())),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) {

try {
socket_ = executor_->createSocket();
connectTimeoutTask_ = std::make_shared<PeriodicTask>(executor_->getIOService(),
clientConfiguration.getConnectionTimeout());
consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to initialize connection: " << e.what());
close();
return;
}

LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
Expand Down Expand Up @@ -1505,9 +1512,11 @@ void ClientConnection::close(Result result) {
}
state_ = Disconnected;
boost::system::error_code err;
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
if (socket_) {
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}

if (tlsSocket_) {
Expand Down Expand Up @@ -1542,7 +1551,10 @@ void ClientConnection::close(Result result) {
consumerStatsRequestTimer_.reset();
}

connectTimeoutTask_->stop();
if (connectTimeoutTask_) {
connectTimeoutTask_->stop();
connectTimeoutTask_.reset();
}

lock.unlock();
LOG_INFO(cnxString_ << "Connection closed");
Expand Down

0 comments on commit babae8e

Please sign in to comment.