Skip to content

Commit

Permalink
Support Pulsar proxy from C++/Python client library (apache#1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 1, 2018
1 parent fb72f72 commit 58d5727
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 34 deletions.
10 changes: 7 additions & 3 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(
}
std::string lookupName = dn->toString();
LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
_1, _2, promise));
return promise->getFuture();
Expand All @@ -71,7 +71,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
return promise->getFuture();
}
std::string lookupName = dn->toString();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
lookupName, _1, _2, promise));
return promise->getFuture();
Expand Down Expand Up @@ -100,8 +100,12 @@ void BinaryProtoLookupService::handleLookup(const std::string& destinationName,
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << destinationName << " redirected to "
<< data->getBrokerUrl());

const std::string& logicalAddress = data->getBrokerUrl();
const std::string& physicalAddress =
data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
cnxPool_.getConnectionAsync(data->getBrokerUrl());
cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this,
destinationName, data->isAuthoritative(), _1, _2, promise));
} else {
Expand Down
25 changes: 17 additions & 8 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ static Result getResult(ServerError serverError) {
return ResultUnknownError;
}

ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication)
: state_(Pending),
Expand All @@ -114,8 +115,9 @@ ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServiceP
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
address_(endpoint),
cnxString_("[<none> -> " + endpoint + "] "),
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
incomingCmd_(),
Expand Down Expand Up @@ -267,7 +269,11 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
cnxString_ = cnxStringStream.str();

LOG_INFO(cnxString_ << "Connected to broker");
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
state_ = TcpConnected;
socket_->set_option(tcp::no_delay(true));

Expand All @@ -288,7 +294,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
if (!isTlsAllowInsecureConnection_) {
boost::system::error_code err;
Url service_url;
if (!Url::parse(address_, service_url)) {
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
Expand All @@ -315,7 +321,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
}

void ClientConnection::handleHandshake(const boost::system::error_code& err) {
SharedBuffer buffer = Commands::newConnect(authentication_);
bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
// Send CONNECT command to broker
asyncWrite(buffer.const_asio_buffer(),
boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(),
Expand Down Expand Up @@ -343,7 +350,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
void ClientConnection::tcpConnectAsync() {
boost::system::error_code err;
Url service_url;
if (!Url::parse(address_, service_url)) {
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
Expand Down Expand Up @@ -788,6 +795,8 @@ void ClientConnection::handleIncomingCommand() {
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
CommandLookupTopicResponse::Redirect);
lookupResultPtr->setShouldProxyThroughServiceUrl(
lookupTopicResponse.proxy_through_service_url());
lookupDataPromise->setValue(lookupResultPtr);
}

Expand Down Expand Up @@ -1178,7 +1187,7 @@ void ClientConnection::removeConsumer(int consumerId) {
consumers_.erase(consumerId);
}

const std::string& ClientConnection::brokerAddress() const { return address_; }
const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }

const std::string& ClientConnection::cnxString() const { return cnxString_; }

Expand Down
15 changes: 10 additions & 5 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
typedef std::vector<ConnectionListener>::iterator ListenerIterator;

/*
* endpoint - url of the service, for ex. pulsar://localhost:6650
* connected - set when tcp connection is established
* logicalAddress - url of the service, for ex. pulsar://localhost:6650
* physicalAddress - the address to connect to, it could be different from the logical address if proxy
* comes into play connected - set when tcp connection is established
*
*/
ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication);
ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication);
~ClientConnection();

/*
Expand Down Expand Up @@ -226,10 +228,13 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;

const std::string logicalAddress_;

/*
* stores address of the service, for ex. pulsar://localhost:6650
*/
const std::string address_;
const std::string physicalAddress_;

// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
std::string cnxString_;
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,11 @@ void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
Promise<Result, ClientConnectionWeakPtr> promise) {
if (data) {
LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl());
Future<Result, ClientConnectionWeakPtr> future = pool_.getConnectionAsync(data->getBrokerUrl());
const std::string& logicalAddress = data->getBrokerUrl();
const std::string& physicalAddress =
data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
pool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(boost::bind(&ClientImpl::handleNewConnection, this, _1, _2, promise));
} else {
promise.setFailed(result);
Expand Down
10 changes: 9 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "pulsar/MessageBuilder.h"
#include "LogUtils.h"
#include "Utils.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
#include <algorithm>
#include <boost/thread/mutex.hpp>
Expand Down Expand Up @@ -160,13 +161,20 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint
return composite;
}

SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication) {
SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
connect->set_client_version(_PULSAR_VERSION_);
connect->set_auth_method_name(authentication->getAuthMethodName());
connect->set_protocol_version(ProtocolVersion_MAX);
if (connectingThroughProxy) {
Url logicalAddressUrl;
Url::parse(logicalAddress, logicalAddressUrl);
connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort());
}

AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class Commands {
const static uint16_t magicCrc32c = 0x0e01;
const static int checksumSize = 4;

static SharedBuffer newConnect(const AuthenticationPtr& authentication);
static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy);

static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);

Expand Down
19 changes: 10 additions & 9 deletions pulsar-client-cpp/lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,37 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
poolConnections_(poolConnections),
mutex_() {}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& endpoint) {
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
boost::unique_lock<boost::mutex> lock(mutex_);

if (poolConnections_) {
PoolMap::iterator cnxIt = pool_.find(endpoint);
PoolMap::iterator cnxIt = pool_.find(logicalAddress);
if (cnxIt != pool_.end()) {
ClientConnectionPtr cnx = cnxIt->second.lock();

if (cnx && !cnx->isClosed()) {
// Found a valid or pending connection in the pool
LOG_DEBUG("Got connection from pool for " << endpoint << " use_count: " //
LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " //
<< (cnx.use_count() - 1) << " @ " << cnx.get());
return cnx->getConnectFuture();
} else {
// Deleting stale connection
LOG_INFO("Deleting stale connection from pool for "
<< endpoint << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
pool_.erase(endpoint);
<< logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
pool_.erase(logicalAddress);
}
}
}

// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx(
new ClientConnection(endpoint, executorProvider_->get(), clientConfiguration_, authentication_));
ClientConnectionPtr cnx(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
clientConfiguration_, authentication_));

LOG_INFO("Created connection for " << endpoint);
LOG_INFO("Created connection for " << logicalAddress);

Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
pool_.insert(std::make_pair(endpoint, cnx));
pool_.insert(std::make_pair(logicalAddress, cnx));

lock.unlock();

Expand Down
19 changes: 18 additions & 1 deletion pulsar-client-cpp/lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,24 @@ class ConnectionPool {
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
const AuthenticationPtr& authentication, bool poolConnections = true);

Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& endpoint);
/**
* Get a connection from the pool.
* <p>
* The connection can either be created or be coming from the pool itself.
* <p>
* When specifying multiple addresses, the logicalAddress is used as a tag for the broker,
* while the physicalAddress is where the connection is actually happening.
* <p>
* These two addresses can be different when the client is forced to connect through
* a proxy layer. Essentially, the pool is using the logical address as a way to
* decide whether to reuse a particular connection.
*
* @param logicalAddress the address to use as the broker tag
* @param physicalAddress the real address where the TCP connection should be made
* @return a future that will produce the ClientCnx object
*/
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress);

private:
ClientConfiguration clientConfiguration_;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);

LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
Expand Down
15 changes: 12 additions & 3 deletions pulsar-client-cpp/lib/LookupDataResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class LookupDataResult {
public:
void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; }
void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ = brokerUrlSsl; }
std::string getBrokerUrl() { return brokerUrl_; }
std::string getBrokerUrlSsl() { return brokerUrlSsl_; }
const std::string& getBrokerUrl() const { return brokerUrl_; }
const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; }

bool isAuthoritative() const { return authoritative; }

Expand All @@ -47,19 +47,28 @@ class LookupDataResult {

void setRedirect(bool redirect) { this->redirect = redirect; }

bool shouldProxyThroughServiceUrl() const { return proxyThroughServiceUrl_; }

void setShouldProxyThroughServiceUrl(bool proxyThroughServiceUrl) {
proxyThroughServiceUrl_ = proxyThroughServiceUrl;
}

private:
friend inline std::ostream& operator<<(std::ostream& os, const LookupDataResult& b);
std::string brokerUrl_;
std::string brokerUrlSsl_;
int partitions;
bool authoritative;
bool redirect;

bool proxyThroughServiceUrl_;
};

std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] [brokerUrlSsl_ = " << b.brokerUrlSsl_
<< "] [partitions = " << b.partitions << "] [authoritative = " << b.authoritative
<< "] [redirect = " << b.redirect << "]";
<< "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << b.proxyThroughServiceUrl_
<< "] }";
return os;
}
} // namespace pulsar
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/LookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ class LookupService {
* Gets Partition metadata
*/
virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0;

virtual ~LookupService() {}
};

typedef boost::shared_ptr<LookupService> LookupServicePtr;

} // namespace pulsar
#endif // PULSAR_CPP_LOOKUPSERVICE_H
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/Url.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ const std::string& Url::file() const { return file_; }

const std::string& Url::parameter() const { return parameter_; }

std::string Url::hostPort() const {
std::stringstream ss;
ss << host_ << ':' << port_;
return ss.str();
}

std::ostream& operator<<(std::ostream& os, const Url& obj) {
os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol() << ", Port = " << obj.port()
<< "]";
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/Url.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Url {
const std::string& parameter() const;
friend std::ostream& operator<<(std::ostream& os, const Url& obj);

std::string hostPort() const;

private:
std::string protocol_;
std::string host_;
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/lib/lz4/lz4.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,11 @@ int LZ4_decompress_safe_partial(const char* source, char* dest, int compressedSi
* note : only allocated directly the structure if you are statically linking LZ4
* If you are using liblz4 as a DLL, please use below construction methods instead.
*/
// clang-format off
typedef struct {
long long table[LZ4_STREAMSIZE_U64];
} LZ4_stream_t;
// clang-format on

/*
* LZ4_resetStream
Expand Down Expand Up @@ -254,9 +256,12 @@ int LZ4_saveDict(LZ4_stream_t* streamPtr, char* safeBuffer, int dictSize);

#define LZ4_STREAMDECODESIZE_U64 4
#define LZ4_STREAMDECODESIZE (LZ4_STREAMDECODESIZE_U64 * sizeof(unsigned long long))
// clang-format off
typedef struct {
unsigned long long table[LZ4_STREAMDECODESIZE_U64];
} LZ4_streamDecode_t;
// clang-format on

/*
* LZ4_streamDecode_t
* information structure to track an LZ4 stream.
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/python/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
batching_max_publish_delay_ms=10
)

while True:
for i in range(10):
try:
producer.send_async('hello', None)
producer.send('hello', None)
except Exception as e:
print("Failed to send message: %s", e)

Expand Down

0 comments on commit 58d5727

Please sign in to comment.