Skip to content

Commit

Permalink
[C++][Python] Add connection timeout configuration (apache#11029)
Browse files Browse the repository at this point in the history
Fixes apache#10747 

### Motivation

This PR is a catchup of apache#2852 and adds connection timeout configuration to C++ and Python client.

### Modifications

- Add a `PeriodicTask` class to execute tasks periodically and the relate unit tests: `PeriodicTastTest`.
- Use `PeriodicTask` to register a timer before connecting to broker asynchronously, if the connection was not established when the timer is triggered, close the socket so that `handleTcpConnected` can be triggered immediately with a failure.
- Add connection timeout (in milliseconds) to both C++ and Python clients.
- Add `ClientTest.testConnectTimeout` (C++) and `test_connect_timeout` (Python) and  to verify the connection timeout works.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
- PeriodicTaskTest
- ClientTest.testConnectTimeout
- test_connect_timeout
  • Loading branch information
BewareMyPower authored Jun 25, 2021
1 parent cadf59d commit 6062d2f
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 4 deletions.
16 changes: 16 additions & 0 deletions pulsar-client-cpp/include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,22 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
unsigned int getPartitionsUpdateInterval() const;

/**
* Set the duration of time to wait for a connection to a broker to be established. If the duration passes
* without a response from the broker, the connection attempt is dropped.
*
* Default: 10000
*
* @param timeoutMs the duration in milliseconds
* @return
*/
ClientConfiguration& setConnectionTimeout(int timeoutMs);

/**
* The getter associated with setConnectionTimeout().
*/
int getConnectionTimeout() const;

friend class ClientImpl;
friend class PulsarWrapper;

Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,12 @@ ClientConfiguration& ClientConfiguration::setListenerName(const std::string& lis
}

const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; }

ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) {
impl_->connectionTimeoutMs = timeoutMs;
return *this;
}

int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; }

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct ClientConfigurationImpl {
bool validateHostName{false};
unsigned int partitionsUpdateInterval{60}; // 1 minute
std::string listenerName;
int connectionTimeoutMs{10000}; // 10 seconds

std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
};
Expand Down
28 changes: 27 additions & 1 deletion pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
incomingCmd_(),
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
clientConfiguration.getConnectionTimeout())),
pendingWriteBuffers_(),
pendingWriteOperations_(0),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
Expand Down Expand Up @@ -374,6 +376,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
state_ = TcpConnected;
connectTimeoutTask_->stop();
socket_->set_option(tcp::no_delay(true));

socket_->set_option(tcp::socket::keep_alive(true));
Expand Down Expand Up @@ -414,7 +417,13 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
}
} else if (endpointIterator != tcp::resolver::iterator()) {
// The connection failed. Try the next endpoint in the list.
socket_->close();
boost::system::error_code err;
socket_->close(err); // ignore the error of close
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
Expand Down Expand Up @@ -500,6 +509,18 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
return;
}

auto self = shared_from_this();
connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
if (state_ != TcpConnected) {
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
PeriodicTask::ErrorCode ignoredError;
socket_->close(ignoredError);
}
connectTimeoutTask_->stop();
});

connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
Expand Down Expand Up @@ -1412,6 +1433,9 @@ void ClientConnection::close() {
state_ = Disconnected;
boost::system::error_code err;
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}

if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
Expand Down Expand Up @@ -1442,6 +1466,8 @@ void ClientConnection::close() {
consumerStatsRequestTimer_.reset();
}

connectTimeoutTask_->stop();

lock.unlock();
LOG_INFO(cnxString_ << "Connection closed");

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <pulsar/Client.h>
#include <set>
#include <lib/BrokerConsumerStatsImpl.h>
#include "lib/PeriodicTask.h"

using namespace pulsar;

Expand Down Expand Up @@ -283,6 +284,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
proto::BaseCommand incomingCmd_;

Promise<Result, ClientConnectionWeakPtr> connectPromise_;
std::shared_ptr<PeriodicTask> connectTimeoutTask_;

typedef std::map<long, PendingRequestData> PendingRequestsMap;
PendingRequestsMap pendingRequests_;
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
state_ = Closing;
lock.unlock();

LOG_INFO("Closing Pulsar client");
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");

for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
ProducerImplBasePtr producer = it->lock();
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
void postWork(std::function<void(void)> task);
void close();

boost::asio::io_service &getIOService() { return *io_service_; }

private:
/*
* only called once and within lock so no need to worry about thread-safety
Expand Down
60 changes: 60 additions & 0 deletions pulsar-client-cpp/lib/PeriodicTask.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "lib/PeriodicTask.h"
#include <boost/date_time/posix_time/posix_time.hpp>

namespace pulsar {

void PeriodicTask::start() {
if (state_ != Pending) {
return;
}
state_ = Ready;
if (periodMs_ >= 0) {
auto self = shared_from_this();
timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
}
}

void PeriodicTask::stop() {
State state = Ready;
if (!state_.compare_exchange_strong(state, Closing)) {
return;
}
timer_.cancel();
state_ = Pending;
}

void PeriodicTask::handleTimeout(const ErrorCode& ec) {
if (state_ != Ready) {
return;
}

callback_(ec);

// state_ may be changed in handleTimeout, so we check state_ again
if (state_ == Ready) {
auto self = shared_from_this();
timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
}
}

} // namespace pulsar
76 changes: 76 additions & 0 deletions pulsar-client-cpp/lib/PeriodicTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>

#include <boost/asio.hpp>

namespace pulsar {

/**
* A task that is executed periodically.
*
* After the `start()` method is called, it will trigger `callback_` method periodically whose interval is
* `periodMs` in the constructor. After the `stop()` method is called, the timer will be cancelled and
* `callback()` will never be called again unless `start()` was called again.
*
* If you don't want to execute the task infinitely, you can call `stop()` in the implementation of
* `callback()` method.
*
* NOTE: If the `periodMs` is negative, the `callback()` will never be called.
*/
class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
public:
using ErrorCode = boost::system::error_code;
using CallbackType = std::function<void(const ErrorCode&)>;

enum State : std::uint8_t
{
Pending,
Ready,
Closing
};

PeriodicTask(boost::asio::io_service& ioService, int periodMs) : timer_(ioService), periodMs_(periodMs) {}

void start();

void stop();

void setCallback(CallbackType callback) noexcept { callback_ = callback; }

State getState() const noexcept { return state_; }
int getPeriodMs() const noexcept { return periodMs_; }

private:
std::atomic<State> state_{Pending};
boost::asio::deadline_timer timer_;
const int periodMs_;
CallbackType callback_{trivialCallback};

void handleTimeout(const ErrorCode& ec);

static void trivialCallback(const ErrorCode&) {}
};

} // namespace pulsar
7 changes: 6 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ def __init__(self, service_url,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None
logger=None,
connection_timeout_ms=10000,
):
"""
Create a new Pulsar client instance.
Expand Down Expand Up @@ -409,10 +410,13 @@ def __init__(self, service_url,
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
* `connection_timeout_ms`:
Set timeout in milliseconds on TCP connections.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
Expand All @@ -427,6 +431,7 @@ def __init__(self, service_url,
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
Expand Down
18 changes: 17 additions & 1 deletion pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
CryptoKeyReader

from _pulsar import ProducerConfiguration, ConsumerConfiguration
from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError

from schema_test import *

Expand Down Expand Up @@ -1148,6 +1148,22 @@ def test_negative_acks(self):
consumer.receive(100)
client.close()

def test_connect_timeout(self):
client = pulsar.Client(
service_url='pulsar://192.0.2.1:1234',
connection_timeout_ms=1000, # 1 second
)
t1 = time.time()
try:
producer = client.create_producer('test_connect_timeout')
self.fail('create_producer should not succeed')
except ConnectError as expected:
print('expected error: {} when create producer'.format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
client.close()

def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ void export_config() {
.def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
.def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
.def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
.def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
.def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>())
.def("io_threads", &ClientConfiguration::getIOThreads)
.def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
.def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
Expand Down
28 changes: 28 additions & 0 deletions pulsar-client-cpp/tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>

#include <future>
#include <pulsar/Client.h>
#include "../lib/checksum/ChecksumProvider.h"

Expand Down Expand Up @@ -86,3 +87,30 @@ TEST(ClientTest, testServerConnectError) {
ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader));
client.close();
}

TEST(ClientTest, testConnectTimeout) {
// 192.0.2.0/24 is assigned for documentation, should be a deadend
const std::string blackHoleBroker = "pulsar://192.0.2.1:1234";
const std::string topic = "test-connect-timeout";

Client clientLow(blackHoleBroker, ClientConfiguration().setConnectionTimeout(1000));
Client clientDefault(blackHoleBroker);

std::promise<Result> promiseLow;
clientLow.createProducerAsync(
topic, [&promiseLow](Result result, Producer producer) { promiseLow.set_value(result); });

std::promise<Result> promiseDefault;
clientDefault.createProducerAsync(
topic, [&promiseDefault](Result result, Producer producer) { promiseDefault.set_value(result); });

auto futureLow = promiseLow.get_future();
ASSERT_EQ(futureLow.wait_for(std::chrono::milliseconds(1500)), std::future_status::ready);
ASSERT_EQ(futureLow.get(), ResultConnectError);

auto futureDefault = promiseDefault.get_future();
ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::timeout);

clientLow.close();
clientDefault.close();
}
Loading

0 comments on commit 6062d2f

Please sign in to comment.