Skip to content

Commit

Permalink
Implement TopicMetadata for MessageRouter in C++ client (apache#1033)
Browse files Browse the repository at this point in the history
  • Loading branch information
Licht-T authored and merlimat committed Jan 19, 2018
1 parent a5c123e commit cccc0bd
Show file tree
Hide file tree
Showing 22 changed files with 444 additions and 41 deletions.
5 changes: 4 additions & 1 deletion build/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ ADD protobuf.patch /pulsar

RUN apt-get update
RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \
liblog4cxx-dev libprotobuf-dev libboost-all-dev libgtest-dev \
liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
libjsoncpp-dev libxml2-utils protobuf-compiler wget \
curl doxygen openjdk-8-jdk-headless clang-format-4.0

# Compile and install gtest
RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib

# Compile and install google-mock
RUN cd /usr/src/gmock && cmake . && make && cp libgmock.a /usr/lib

# Include gtest parallel to speed up unit tests
RUN git clone https://github.com/google/gtest-parallel.git

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ find_package(Boost REQUIRED COMPONENTS program_options filesystem regex

find_package(OpenSSL REQUIRED)
find_path(GTEST_INCLUDE_PATH gtest/gtest.h)
find_path(GMOCK_INCLUDE_PATH gmock/gmock.h)
find_path(JSON_INCLUDE_PATH jsoncpp)
find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)

Expand All @@ -147,6 +148,7 @@ include_directories(
${PROTOBUF_INCLUDE_DIR}
${LOG4CXX_INCLUDE_PATH}
${GTEST_INCLUDE_PATH}
${GMOCK_INCLUDE_PATH}
${JSON_INCLUDE_PATH}
)

Expand Down
35 changes: 35 additions & 0 deletions pulsar-client-cpp/include/pulsar/DeprecatedException.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef DEPRECATED_EXCEPTION_HPP_
#define DEPRECATED_EXCEPTION_HPP_

#include <stdexcept>
#include <string>

namespace pulsar {
class DeprecatedException: public std::runtime_error {
public:
explicit DeprecatedException(const std::string& __arg);

private:
static const std::string message_prefix;
};
}

#endif //DEPRECATED_EXCEPTION_HPP_
1 change: 1 addition & 0 deletions pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class MessageBuilder;
class MessageImpl;
class PulsarWrapper;

// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock tests
class Message {
public:
typedef std::map<std::string, std::string> StringMap;
Expand Down
17 changes: 15 additions & 2 deletions pulsar-client-cpp/include/pulsar/MessageRoutingPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
#ifndef PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
#define PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
#include "Message.h"

#include <pulsar/DeprecatedException.h>
#include <pulsar/Message.h>
#include <pulsar/TopicMetadata.h>
#include <boost/shared_ptr.hpp>

#pragma GCC visibility push(default)
Expand All @@ -33,7 +36,17 @@ class MessageRoutingPolicy {
public:
virtual ~MessageRoutingPolicy() {}

virtual int getPartition(const Message& msg) = 0;
/** @deprecated
Use int getPartition(const Message& msg, const TopicMetadata& topicMetadata)
*/
virtual int getPartition(const Message& msg) {
throw DeprecatedException("Use int getPartition(const Message& msg,"
" const TopicMetadata& topicMetadata)");
}

virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
return getPartition(msg);
}
};

typedef boost::shared_ptr<MessageRoutingPolicy> MessageRoutingPolicyPtr;
Expand Down
32 changes: 32 additions & 0 deletions pulsar-client-cpp/include/pulsar/TopicMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef TOPIC_METADATA_HPP_
#define TOPIC_METADATA_HPP_

namespace pulsar {
/**
* Metadata of a topic that can be used for message routing.
*/
class TopicMetadata {
public:
virtual int getNumPartitions() const = 0;
};
}

#endif /* TOPIC_METADATA_HPP_ */
28 changes: 28 additions & 0 deletions pulsar-client-cpp/lib/DeprecatedException.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/DeprecatedException.h>

namespace pulsar {
const std::string DeprecatedException::message_prefix = "Deprecated: ";

DeprecatedException::DeprecatedException(const std::string& __arg)
: std::runtime_error(message_prefix + __arg) {

}
}
39 changes: 24 additions & 15 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <cstdlib>
#include "PartitionedProducerImpl.h"
#include "LogUtils.h"
#include <boost/bind.hpp>
#include <sstream>
#include "RoundRobinMessageRouter.h"
#include "SinglePartitionMessageRouter.h"
#include "TopicMetadataImpl.h"
#include "DestinationName.h"
#include "MessageImpl.h"

Expand All @@ -37,18 +39,25 @@ namespace pulsar {
const ProducerConfiguration& config):client_(client),
destinationName_(destinationName),
topic_(destinationName_->toString()),
numPartitions_(numPartitions),
conf_(config),
state_(Pending)
state_(Pending),
topicMetadata_(new TopicMetadataImpl(numPartitions))
{
numProducersCreated_ = 0;
cleanup_ = false;
if(config.getPartitionsRoutingMode() == ProducerConfiguration::RoundRobinDistribution) {
routerPolicy_ = boost::make_shared<RoundRobinMessageRouter>(numPartitions);
} else if (config.getPartitionsRoutingMode() == ProducerConfiguration::UseSinglePartition) {
routerPolicy_ = boost::make_shared<SinglePartitionMessageRouter>(numPartitions);
} else {
routerPolicy_ = config.getMessageRouterPtr();
routerPolicy_ = getMessageRouter();
}

MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
switch (conf_.getPartitionsRoutingMode()) {
case ProducerConfiguration::RoundRobinDistribution:
return boost::make_shared<RoundRobinMessageRouter>();
case ProducerConfiguration::CustomPartition:
return conf_.getMessageRouterPtr();
case ProducerConfiguration::UseSinglePartition:
default:
unsigned int random = rand();
return boost::make_shared<SinglePartitionMessageRouter>(random % topicMetadata_->getNumPartitions());
}
}

Expand All @@ -63,7 +72,7 @@ namespace pulsar {
void PartitionedProducerImpl::start() {
boost::shared_ptr<ProducerImpl> producer;
// create producer per partition
for (unsigned int i = 0; i < numPartitions_; i++) {
for (unsigned int i = 0; i < topicMetadata_->getNumPartitions(); i++) {
std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
producer = boost::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
producer->getProducerCreatedFuture().addListener(boost::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
Expand All @@ -89,7 +98,7 @@ namespace pulsar {
// Ignore, we have already informed client that producer creation failed
return;
}
assert(numProducersCreated_ <= numPartitions_);
assert(numProducersCreated_ <= topicMetadata_->getNumPartitions());
if (result != ResultOk) {
state_ = Failed;
lock.unlock();
Expand All @@ -99,9 +108,9 @@ namespace pulsar {
return;
}

assert(partitionIndex <= numPartitions_);
assert(partitionIndex <= topicMetadata_->getNumPartitions());
numProducersCreated_++;
if(numProducersCreated_ == numPartitions_) {
if(numProducersCreated_ == topicMetadata_->getNumPartitions()) {
lock.unlock();
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
Expand All @@ -110,8 +119,8 @@ namespace pulsar {
//override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
//get partition for this message from router policy
short partition = (short)(routerPolicy_->getPartition(msg));
if (partition >= numPartitions_ || partition >= producers_.size()) {
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
if (partition >= topicMetadata_->getNumPartitions() || partition >= producers_.size()) {
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
//change me: abort or notify failure in callback?
// change to appropriate error if callback
Expand Down Expand Up @@ -196,7 +205,7 @@ namespace pulsar {
}
return;
}
assert (partitionIndex < numPartitions_);
assert (partitionIndex < topicMetadata_->getNumPartitions());
if(numProducersCreated_ > 0) {
numProducersCreated_--;
}
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include "DestinationName.h"
#include <vector>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/TopicMetadata.h>

namespace pulsar {

Expand Down Expand Up @@ -88,7 +90,7 @@ namespace pulsar {
const DestinationNamePtr destinationName_;
const std::string topic_;

const unsigned int numPartitions_;
boost::scoped_ptr<TopicMetadata> topicMetadata_;

unsigned int numProducersCreated_;

Expand All @@ -112,6 +114,8 @@ namespace pulsar {

// only set this promise to value, when producers on all partitions are created.
Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;

MessageRoutingPolicyPtr getMessageRouter();
};

} //ends namespace Pulsar
9 changes: 4 additions & 5 deletions pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,24 @@
* under the License.
*/
#include "RoundRobinMessageRouter.h"
#include <boost/algorithm/string.hpp>

namespace pulsar {
RoundRobinMessageRouter::RoundRobinMessageRouter(unsigned int numPartitions):prevPartition_(0), numPartitions_(numPartitions) {
RoundRobinMessageRouter::RoundRobinMessageRouter():prevPartition_(0) {
}

RoundRobinMessageRouter::~RoundRobinMessageRouter() {
}

//override
int RoundRobinMessageRouter::getPartition(const Message& msg) {
int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
//if message has a key, hash the key and return the partition
if (msg.hasPartitionKey()) {
static StringHash hash;
return hash(msg.getPartitionKey()) % numPartitions_;
return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
} else {
Lock lock(mutex_);
//else pick the next partition
return prevPartition_++ % numPartitions_;
return prevPartition_++ % topicMetadata.getNumPartitions();
}
}

Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-cpp/lib/RoundRobinMessageRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
#define PULSAR_RR_MESSAGE_ROUTER_HEADER_

#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/TopicMetadata.h>
#include <boost/functional/hash.hpp>
#include <boost/thread/mutex.hpp>

namespace pulsar {
class RoundRobinMessageRouter : public MessageRoutingPolicy {
public:
RoundRobinMessageRouter (unsigned int numPartitions);
RoundRobinMessageRouter ();
virtual ~RoundRobinMessageRouter();
virtual int getPartition(const Message& msg);
virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
private:
boost::mutex mutex_;
unsigned int prevPartition_;
unsigned int numPartitions_;
};
typedef boost::hash<std::string> StringHash;
typedef boost::unique_lock<boost::mutex> Lock;
Expand Down
12 changes: 5 additions & 7 deletions pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@
* under the License.
*/
#include "SinglePartitionMessageRouter.h"
#include <cstdlib> // rand()
#include <boost/algorithm/string.hpp>

namespace pulsar {
SinglePartitionMessageRouter::~SinglePartitionMessageRouter(){}
SinglePartitionMessageRouter::SinglePartitionMessageRouter(unsigned int numPartitions):numPartitions_(numPartitions) {
unsigned int random = rand();
selectedSinglePartition_ = random % numPartitions_;
SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex) {
selectedSinglePartition_ = partitionIndex;
}

//override
int SinglePartitionMessageRouter::getPartition(const Message& msg) {
int SinglePartitionMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
//if message has a key, hash the key and return the partition
if (msg.hasPartitionKey()) {
StringHash hash;
return hash(msg.getPartitionKey()) % numPartitions_;
return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
} else {
//else pick the next partition
return selectedSinglePartition_;
Expand Down
Loading

0 comments on commit cccc0bd

Please sign in to comment.