From 8d159efa5a4f25191fade5a9fddb45a245ef2e6b Mon Sep 17 00:00:00 2001 From: Licht Takeuchi Date: Wed, 31 Jan 2018 14:21:07 +0900 Subject: [PATCH] Fix MessageRouter hash inconsistent on C++/Java client (#1029) * Fix hash inconsistent on between C++ and Java clients. * Add HashingScheme to select hash function on Java client * Fix the bug of Murmur3_32Hash on C++ client * Add Javadoc on makeHash method * Use JavaStringHash as default hash on Java client * Use BoostHash as default hash on C++ client * Make hash method always returns a signed integer * Re-implement hash classes as singleton on Java client * Move hash classes from include to lib * Change constructor argument of hash classes * Remove unused headers * Remove `auto` type * Fix C++ client Hash classes so that these return non-negative signed integer This is the same behavior as Hash classes on Java client * Add tests for C++/Java client Hash --- .../include/pulsar/ProducerConfiguration.h | 9 ++ pulsar-client-cpp/lib/BoostHash.cc | 29 +++++ pulsar-client-cpp/lib/BoostHash.h | 39 +++++++ pulsar-client-cpp/lib/Hash.h | 37 ++++++ pulsar-client-cpp/lib/JavaStringHash.cc | 39 +++++++ pulsar-client-cpp/lib/JavaStringHash.h | 36 ++++++ pulsar-client-cpp/lib/MessageRouterBase.cc | 40 +++++++ pulsar-client-cpp/lib/MessageRouterBase.h | 41 +++++++ pulsar-client-cpp/lib/Murmur3_32Hash.cc | 110 ++++++++++++++++++ pulsar-client-cpp/lib/Murmur3_32Hash.h | 54 +++++++++ .../lib/PartitionedProducerImpl.cc | 9 +- .../lib/ProducerConfiguration.cc | 9 ++ .../lib/ProducerConfigurationImpl.h | 2 + .../lib/RoundRobinMessageRouter.cc | 7 +- .../lib/RoundRobinMessageRouter.h | 9 +- .../lib/SinglePartitionMessageRouter.cc | 7 +- .../lib/SinglePartitionMessageRouter.h | 11 +- pulsar-client-cpp/tests/HashTest.cc | 67 +++++++++++ .../tests/RoundRobinMessageRouterTest.cc | 8 +- .../tests/SinglePartitionMessageRouterTest.cc | 6 +- .../client/api/ProducerConfiguration.java | 20 +++- .../org/apache/pulsar/client/impl/Hash.java | 28 +++++ .../pulsar/client/impl/JavaStringHash.java | 34 ++++++ .../pulsar/client/impl/MessageRouterBase.java | 37 ++++++ .../pulsar/client/impl/Murmur3_32Hash.java | 102 ++++++++++++++++ .../client/impl/PartitionedProducerImpl.java | 4 +- .../RoundRobinPartitionMessageRouterImpl.java | 10 +- .../SinglePartitionMessageRouterImpl.java | 9 +- .../apache/pulsar/client/impl/HashTest.java | 49 ++++++++ ...ndRobinPartitionMessageRouterImplTest.java | 5 +- .../SinglePartitionMessageRouterImplTest.java | 5 +- 31 files changed, 832 insertions(+), 40 deletions(-) create mode 100644 pulsar-client-cpp/lib/BoostHash.cc create mode 100644 pulsar-client-cpp/lib/BoostHash.h create mode 100644 pulsar-client-cpp/lib/Hash.h create mode 100644 pulsar-client-cpp/lib/JavaStringHash.cc create mode 100644 pulsar-client-cpp/lib/JavaStringHash.h create mode 100644 pulsar-client-cpp/lib/MessageRouterBase.cc create mode 100644 pulsar-client-cpp/lib/MessageRouterBase.h create mode 100644 pulsar-client-cpp/lib/Murmur3_32Hash.cc create mode 100644 pulsar-client-cpp/lib/Murmur3_32Hash.h create mode 100644 pulsar-client-cpp/tests/HashTest.cc create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 9d2d5f9bb610b..d12b65dda5868 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -44,6 +44,12 @@ class ProducerConfiguration { RoundRobinDistribution, CustomPartition }; + enum HashingScheme + { + Murmur3_32Hash, + BoostHash, + JavaStringHash + }; ProducerConfiguration(); ~ProducerConfiguration(); ProducerConfiguration(const ProducerConfiguration&); @@ -86,6 +92,9 @@ class ProducerConfiguration { ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router); const MessageRoutingPolicyPtr& getMessageRouterPtr() const; + ProducerConfiguration& setHashingScheme(const HashingScheme& scheme); + HashingScheme getHashingScheme() const; + ProducerConfiguration& setBlockIfQueueFull(bool); bool getBlockIfQueueFull() const; diff --git a/pulsar-client-cpp/lib/BoostHash.cc b/pulsar-client-cpp/lib/BoostHash.cc new file mode 100644 index 0000000000000..90876b77f8b0a --- /dev/null +++ b/pulsar-client-cpp/lib/BoostHash.cc @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "BoostHash.h" + +namespace pulsar { + +BoostHash::BoostHash() : hash() {} + +int32_t BoostHash::makeHash(const std::string& key) { + return static_cast(hash(key) & std::numeric_limits::max()); +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/BoostHash.h b/pulsar-client-cpp/lib/BoostHash.h new file mode 100644 index 0000000000000..e5911fef647c8 --- /dev/null +++ b/pulsar-client-cpp/lib/BoostHash.h @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef BOOST_HASH_HPP_ +#define BOOST_HASH_HPP_ + +#include "Hash.h" + +#include +#include +#include + +namespace pulsar { +class BoostHash : public Hash { + public: + BoostHash(); + int32_t makeHash(const std::string &key); + + private: + boost::hash hash; +}; +} // namespace pulsar + +#endif /* BOOST_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/Hash.h b/pulsar-client-cpp/lib/Hash.h new file mode 100644 index 0000000000000..f27847826d669 --- /dev/null +++ b/pulsar-client-cpp/lib/Hash.h @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef HASH_HPP_ +#define HASH_HPP_ + +#include +#include + +namespace pulsar { +class Hash { + public: + /** + * Generate the hash of a given String + * + * @return The hash of {@param key}, which is non-negative integer. + */ + virtual int32_t makeHash(const std::string& key) = 0; +}; +} // namespace pulsar + +#endif /* HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/JavaStringHash.cc b/pulsar-client-cpp/lib/JavaStringHash.cc new file mode 100644 index 0000000000000..7579e0e05269f --- /dev/null +++ b/pulsar-client-cpp/lib/JavaStringHash.cc @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "JavaStringHash.h" + +namespace pulsar { + +JavaStringHash::JavaStringHash() {} + +int32_t JavaStringHash::makeHash(const std::string& key) { + uint64_t len = key.length(); + const char* val = key.c_str(); + uint32_t hash = 0; + + for (int i = 0; i < len; i++) { + hash = 31 * hash + val[i]; + } + + hash &= std::numeric_limits::max(); + + return hash; +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/JavaStringHash.h b/pulsar-client-cpp/lib/JavaStringHash.h new file mode 100644 index 0000000000000..3b01aa8055f5c --- /dev/null +++ b/pulsar-client-cpp/lib/JavaStringHash.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef JAVA_DEFAULT_HASH_HPP_ +#define JAVA_DEFAULT_HASH_HPP_ + +#include "Hash.h" + +#include +#include +#include + +namespace pulsar { +class JavaStringHash : public Hash { + public: + JavaStringHash(); + int32_t makeHash(const std::string &key); +}; +} // namespace pulsar + +#endif /* JAVA_DEFAULT_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/MessageRouterBase.cc b/pulsar-client-cpp/lib/MessageRouterBase.cc new file mode 100644 index 0000000000000..c0824f9e0a301 --- /dev/null +++ b/pulsar-client-cpp/lib/MessageRouterBase.cc @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "MessageRouterBase.h" + +#include "BoostHash.h" +#include "JavaStringHash.h" +#include "Murmur3_32Hash.h" + +namespace pulsar { +MessageRouterBase::MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme) { + switch (hashingScheme) { + case ProducerConfiguration::BoostHash: + hash = HashPtr(new BoostHash()); + break; + case ProducerConfiguration::JavaStringHash: + hash = HashPtr(new JavaStringHash()); + break; + case ProducerConfiguration::Murmur3_32Hash: + default: + hash = HashPtr(new Murmur3_32Hash()); + break; + } +} +} // namespace pulsar \ No newline at end of file diff --git a/pulsar-client-cpp/lib/MessageRouterBase.h b/pulsar-client-cpp/lib/MessageRouterBase.h new file mode 100644 index 0000000000000..ca458d2f2dcf7 --- /dev/null +++ b/pulsar-client-cpp/lib/MessageRouterBase.h @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_CPP_MESSAGEROUTERBASE_H +#define PULSAR_CPP_MESSAGEROUTERBASE_H + +#include +#include + +#include +#include +#include "Hash.h" + +namespace pulsar { +typedef boost::interprocess::unique_ptr > HashPtr; + +class MessageRouterBase : public MessageRoutingPolicy { + public: + MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme); + + protected: + HashPtr hash; +}; +} // namespace pulsar + +#endif // PULSAR_CPP_MESSAGEROUTERBASE_H diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.cc b/pulsar-client-cpp/lib/Murmur3_32Hash.cc new file mode 100644 index 0000000000000..1367514001a75 --- /dev/null +++ b/pulsar-client-cpp/lib/Murmur3_32Hash.cc @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//----------------------------------------------------------------------------- +// The original MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. This source code, implemented by Licht Takeuchi, is based on +// the orignal MurmurHash3 source code. +#include "Murmur3_32Hash.h" + +#include +#include + +#if BOOST_COMP_MSVC +#include +#define ROTATE_LEFT(x, y) _rotl(x, y) +#else +#define ROTATE_LEFT(x, y) rotate_left(x, y) +#endif + +#if BOOST_ENDIAN_LITTLE_BYTE +#define BYTESPWAP(x) (x) +#elif BOOST_ENDIAN_BIG_BYTE +#if BOOST_COMP_CLANG || BOOST_COMP_GNUC +#define BYTESPWAP(x) __builtin_bswap32(x) +#elif BOOST_COMP_MSVC +#define BYTESPWAP(x) _byteswap_uint32(x) +#else +#endif +#else +#endif + +namespace pulsar { + +Murmur3_32Hash::Murmur3_32Hash() : seed(0) {} + +int32_t Murmur3_32Hash::makeHash(const std::string &key) { + return static_cast(makeHash(&key.front(), key.length()) & std::numeric_limits::max()); +} + +uint32_t Murmur3_32Hash::makeHash(const void *key, const int64_t len) { + const uint8_t *data = reinterpret_cast(key); + const int nblocks = len / CHUNK_SIZE; + uint32_t h1 = seed; + const uint32_t *blocks = reinterpret_cast(data + nblocks * CHUNK_SIZE); + + for (int i = -nblocks; i != 0; i++) { + uint32_t k1 = BYTESPWAP(blocks[i]); + + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + uint32_t k1 = 0; + switch (len - nblocks * CHUNK_SIZE) { + case 3: + k1 ^= static_cast(blocks[2]) << 16; + case 2: + k1 ^= static_cast(blocks[1]) << 8; + case 1: + k1 ^= static_cast(blocks[0]); + }; + + h1 ^= mixK1(k1); + h1 ^= len; + h1 = fmix(h1); + + return h1; +} + +uint32_t Murmur3_32Hash::fmix(uint32_t h) { + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +uint32_t Murmur3_32Hash::mixK1(uint32_t k1) { + k1 *= C1; + k1 = ROTATE_LEFT(k1, 15); + k1 *= C2; + return k1; +} + +uint32_t Murmur3_32Hash::mixH1(uint32_t h1, uint32_t k1) { + h1 ^= k1; + h1 = ROTATE_LEFT(h1, 13); + return h1 * 5 + 0xe6546b64; +} + +uint32_t Murmur3_32Hash::rotate_left(uint32_t x, uint8_t r) { return (x << r) | (x >> ((32 - r))); } +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/Murmur3_32Hash.h b/pulsar-client-cpp/lib/Murmur3_32Hash.h new file mode 100644 index 0000000000000..644d186471e76 --- /dev/null +++ b/pulsar-client-cpp/lib/Murmur3_32Hash.h @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//----------------------------------------------------------------------------- +// The original MurmurHash3 was written by Austin Appleby, and is placed in the +// public domain. This source code, implemented by Licht Takeuchi, is based on +// the orignal MurmurHash3 source code. +#ifndef MURMUR3_32_HASH_HPP_ +#define MURMUR3_32_HASH_HPP_ + +#include "Hash.h" + +#include +#include + +namespace pulsar { + +class Murmur3_32Hash : public Hash { + public: + Murmur3_32Hash(); + + int32_t makeHash(const std::string& key); + + private: + static constexpr int32_t CHUNK_SIZE = 4; + static constexpr uint32_t C1 = 0xcc9e2d51; + static constexpr uint32_t C2 = 0x1b873593; + uint32_t seed; + + static uint32_t fmix(uint32_t h); + static uint32_t mixK1(uint32_t k1); + static uint32_t mixH1(uint32_t h1, uint32_t k1); + static uint32_t rotate_left(uint32_t x, uint8_t r); + uint32_t makeHash(const void* key, const int64_t len); +}; +} // namespace pulsar + +#endif /* MURMUR3_32_HASH_HPP_ */ diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 825bcc0ffc56a..2c97ab4427b3c 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -56,14 +56,14 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { case ProducerConfiguration::RoundRobinDistribution: - return boost::make_shared(); + return boost::make_shared(conf_.getHashingScheme()); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: default: unsigned int random = rand(); - return boost::make_shared(random % - topicMetadata_->getNumPartitions()); + return boost::make_shared( + random % topicMetadata_->getNumPartitions(), conf_.getHashingScheme()); } } @@ -160,8 +160,7 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const { /* * if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to - * create - * one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure + * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure */ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) { int producerIndex = 0; diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc index 011d85994f984..c010a9f36d058 100644 --- a/pulsar-client-cpp/lib/ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -104,6 +104,15 @@ const MessageRoutingPolicyPtr& ProducerConfiguration::getMessageRouterPtr() cons return impl_->messageRouter; } +ProducerConfiguration& ProducerConfiguration::setHashingScheme(const HashingScheme& scheme) { + impl_->hashingScheme = scheme; + return *this; +} + +ProducerConfiguration::HashingScheme ProducerConfiguration::getHashingScheme() const { + return impl_->hashingScheme; +} + ProducerConfiguration& ProducerConfiguration::setBlockIfQueueFull(bool flag) { impl_->blockIfQueueFull = flag; return *this; diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index f211fb7cd7910..11fe7ccd165ff 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -35,6 +35,7 @@ struct ProducerConfigurationImpl { int maxPendingMessagesAcrossPartitions; ProducerConfiguration::PartitionsRoutingMode routingMode; MessageRoutingPolicyPtr messageRouter; + ProducerConfiguration::HashingScheme hashingScheme; bool blockIfQueueFull; bool batchingEnabled; unsigned int batchingMaxMessages; @@ -46,6 +47,7 @@ struct ProducerConfigurationImpl { maxPendingMessages(1000), maxPendingMessagesAcrossPartitions(50000), routingMode(ProducerConfiguration::UseSinglePartition), + hashingScheme(ProducerConfiguration::BoostHash), blockIfQueueFull(false), batchingEnabled(false), batchingMaxMessages(1000), diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc index af8f49a4d2987..c47fb23338670 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc @@ -19,7 +19,8 @@ #include "RoundRobinMessageRouter.h" namespace pulsar { -RoundRobinMessageRouter::RoundRobinMessageRouter() : prevPartition_(0) {} +RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme) + : MessageRouterBase(hashingScheme), prevPartition_(0) {} RoundRobinMessageRouter::~RoundRobinMessageRouter() {} @@ -27,12 +28,12 @@ RoundRobinMessageRouter::~RoundRobinMessageRouter() {} int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { // if message has a key, hash the key and return the partition if (msg.hasPartitionKey()) { - static StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); + return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); } else { Lock lock(mutex_); // else pick the next partition return prevPartition_++ % topicMetadata.getNumPartitions(); } } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h index 1fe337bb47142..57c27b4db636a 100644 --- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h +++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h @@ -20,14 +20,16 @@ #define PULSAR_RR_MESSAGE_ROUTER_HEADER_ #include +#include #include -#include #include +#include "Hash.h" +#include "MessageRouterBase.h" namespace pulsar { -class RoundRobinMessageRouter : public MessageRoutingPolicy { +class RoundRobinMessageRouter : public MessageRouterBase { public: - RoundRobinMessageRouter(); + RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme); virtual ~RoundRobinMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); @@ -35,7 +37,6 @@ class RoundRobinMessageRouter : public MessageRoutingPolicy { boost::mutex mutex_; unsigned int prevPartition_; }; -typedef boost::hash StringHash; typedef boost::unique_lock Lock; } // namespace pulsar #endif // PULSAR_RR_MESSAGE_ROUTER_HEADER_ diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc index fca97b56afc9a..1e5ab75bb1f0f 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc +++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc @@ -20,7 +20,9 @@ namespace pulsar { SinglePartitionMessageRouter::~SinglePartitionMessageRouter() {} -SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex) { +SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex, + ProducerConfiguration::HashingScheme hashingScheme) + : MessageRouterBase(hashingScheme) { selectedSinglePartition_ = partitionIndex; } @@ -28,8 +30,7 @@ SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIn int SinglePartitionMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) { // if message has a key, hash the key and return the partition if (msg.hasPartitionKey()) { - StringHash hash; - return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); + return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions(); } else { // else pick the next partition return selectedSinglePartition_; diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h index 453f9ea89169e..409d31a8add4c 100644 --- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h +++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h @@ -20,20 +20,23 @@ #define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ #include +#include +#include "Hash.h" #include -#include +#include "MessageRouterBase.h" namespace pulsar { -class SinglePartitionMessageRouter : public MessageRoutingPolicy { +class SinglePartitionMessageRouter : public MessageRouterBase { public: - explicit SinglePartitionMessageRouter(int partitionIndex); - typedef boost::hash StringHash; + SinglePartitionMessageRouter(const int partitionIndex, + ProducerConfiguration::HashingScheme hashingScheme); virtual ~SinglePartitionMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); private: int selectedSinglePartition_; }; + } // namespace pulsar #endif // PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_ diff --git a/pulsar-client-cpp/tests/HashTest.cc b/pulsar-client-cpp/tests/HashTest.cc new file mode 100644 index 0000000000000..97dcb90501262 --- /dev/null +++ b/pulsar-client-cpp/tests/HashTest.cc @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include + +#include "../lib/BoostHash.h" +#include "../lib/JavaStringHash.h" +#include "../lib/Murmur3_32Hash.h" + +using ::testing::AtLeast; +using ::testing::Return; +using ::testing::ReturnRef; + +using namespace pulsar; + +TEST(HashTest, testBoostHash) { + BoostHash hash; + boost::hash boostHash; + + std::string key1 = "key1"; + std::string key2 = "key2"; + + ASSERT_EQ(boostHash(key1) & std::numeric_limits::max(), hash.makeHash(key1)); + ASSERT_EQ(boostHash(key2) & std::numeric_limits::max(), hash.makeHash(key2)); +} + +TEST(HashTest, testJavaStringHash) { + JavaStringHash hash; + + // Calculating `hashCode()` makes overflow as unsigned int32. + std::string key1 = "keykeykeykeykey1"; + + // `hashCode()` is negative as signed int32. + std::string key2 = "keykeykey2"; + + // Same as Java client + ASSERT_EQ(434058482, hash.makeHash(key1)); + ASSERT_EQ(42978643, hash.makeHash(key2)); +} + +TEST(HashTest, testMurmur3_32Hash) { + Murmur3_32Hash hash; + std::string key1 = "key1"; + std::string key2 = "key2"; + + // Same value as Java client + ASSERT_EQ(462881061, hash.makeHash(key1)); + ASSERT_EQ(1936800180, hash.makeHash(key2)); +} \ No newline at end of file diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc index 95423534b1962..431dd6b4e5fca 100644 --- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc +++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc @@ -17,8 +17,10 @@ * under the License. */ #include +#include #include #include +#include #include "tests/mocks/GMockMessage.h" @@ -37,8 +39,8 @@ TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { const int numPartitions1 = 5; const int numPartitions2 = 3; - RoundRobinMessageRouter router1; - RoundRobinMessageRouter router2; + RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash); + RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash); GMockMessage message; EXPECT_CALL(message, hasPartitionKey()).Times(20).WillRepeatedly(Return(false)); @@ -52,7 +54,7 @@ TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) { const int numPartitons = 1234; - RoundRobinMessageRouter router; + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash); std::string partitionKey1 = "key1"; std::string partitionKey2 = "key2"; diff --git a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc index 9457acc169ee3..b62d286ae0589 100644 --- a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc +++ b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc @@ -17,6 +17,8 @@ * under the License. */ #include +#include +#include #include #include @@ -36,7 +38,7 @@ using namespace pulsar; TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) { const int selectedPartition = 1234; - SinglePartitionMessageRouter router(selectedPartition); + SinglePartitionMessageRouter router(selectedPartition, ProducerConfiguration::BoostHash); GMockMessage message; EXPECT_CALL(message, hasPartitionKey()).Times(1).WillOnce(Return(false)); @@ -48,7 +50,7 @@ TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithPartitionKey) { const int numPartitons = 1234; - SinglePartitionMessageRouter router(1); + SinglePartitionMessageRouter router(1, ProducerConfiguration::BoostHash); std::string partitionKey1 = "key1"; std::string partitionKey2 = "key2"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 08b73c29647bd..678e3d56d3f79 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -45,6 +45,7 @@ public class ProducerConfiguration implements Serializable { private int maxPendingMessages = 1000; private int maxPendingMessagesAcrossPartitions = 50000; private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition; + private HashingScheme hashingScheme = HashingScheme.JavaStringHash; private MessageRouter customMessageRouter = null; private long batchingMaxPublishDelayMs = 10; private int batchingMaxMessages = 1000; @@ -64,6 +65,11 @@ public enum MessageRoutingMode { SinglePartition, RoundRobinPartition, CustomPartition } + public enum HashingScheme { + JavaStringHash, + Murmur3_32Hash + } + private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL; /** @@ -136,6 +142,15 @@ public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) { return this; } + public HashingScheme getHashingScheme() { + return hashingScheme; + } + + public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) { + this.hashingScheme = hashingScheme; + return this; + } + /** * * @return the maximum number of pending messages allowed across all the partitions @@ -171,7 +186,7 @@ public boolean getBlockIfQueueFull() { * message queue is full. *

* Default is false. If set to false, send operations will immediately fail with - * {@link ProducerQueueIsFullError} when there is no space left in pending queue. + * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue. * * @param blockIfQueueFull * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full @@ -481,7 +496,8 @@ public boolean equals(Object obj) { ProducerConfiguration other = (ProducerConfiguration) obj; return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs) && Objects.equal(maxPendingMessages, other.maxPendingMessages) - && Objects.equal(this.messageRouteMode, other.messageRouteMode); + && Objects.equal(this.messageRouteMode, other.messageRouteMode) + && Objects.equal(this.hashingScheme, other.hashingScheme); } return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java new file mode 100644 index 0000000000000..31c771a83f12f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Hash.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +public interface Hash { + /** + * Generate the hash of a given String + * + * @return The hash of {@param s}, which is non-negative integer. + */ + int makeHash(String s); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java new file mode 100644 index 0000000000000..2f6fc28dc4637 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/JavaStringHash.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +public class JavaStringHash implements Hash { + private static final JavaStringHash instance = new JavaStringHash(); + + private JavaStringHash(){ } + + public static Hash getInstance() { + return instance; + } + + @Override + public int makeHash(String s) { + return s.hashCode() & Integer.MAX_VALUE; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java new file mode 100644 index 0000000000000..4825dabbbe5d3 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.ProducerConfiguration; + +public abstract class MessageRouterBase implements MessageRouter { + protected final Hash hash; + + MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) { + switch (hashingScheme) { + case JavaStringHash: + this.hash = JavaStringHash.getInstance(); + break; + case Murmur3_32Hash: + default: + this.hash = Murmur3_32Hash.getInstance(); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java new file mode 100644 index 0000000000000..80552da0fa566 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Murmur3_32Hash.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * The original MurmurHash3 was written by Austin Appleby, and is placed in the + * public domain. This source code, implemented by Licht Takeuchi, is based on + * the orignal MurmurHash3 source code. + */ +package org.apache.pulsar.client.impl; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import com.google.common.primitives.UnsignedBytes; + +public class Murmur3_32Hash implements Hash { + private static final Murmur3_32Hash instance = new Murmur3_32Hash(); + + private static final int CHUNK_SIZE = 4; + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + private final int seed; + + private Murmur3_32Hash() { + seed = 0; + } + + public static Hash getInstance() { + return instance; + } + + @Override + public int makeHash(String s) { + return makeHash(s.getBytes(StandardCharsets.UTF_8)) & Integer.MAX_VALUE; + } + + private int makeHash(byte[] bytes) { + int len = bytes.length; + int reminder = len % CHUNK_SIZE; + int h1 = seed; + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + + while (byteBuffer.remaining() >= CHUNK_SIZE) { + int k1 = byteBuffer.getInt(); + + k1 = mixK1(k1); + h1 = mixH1(h1, k1); + } + + int k1 = 0; + for (int i = 0; i < reminder; i++) { + k1 ^= UnsignedBytes.toInt(byteBuffer.get()) << (i * 8); + } + + h1 ^= mixK1(k1); + h1 ^= len; + h1 = fmix(h1); + + return h1; + } + + private int fmix(int h) { + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + h ^= h >>> 16; + + return h; + } + + private int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + return h1 * 5 + 0xe6546b64; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 04269f36ce042..fe9ae6c5023d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -75,12 +75,12 @@ private MessageRouter getMessageRouter() { messageRouter = customMessageRouter; break; case RoundRobinPartition: - messageRouter = new RoundRobinPartitionMessageRouterImpl(); + messageRouter = new RoundRobinPartitionMessageRouterImpl(conf.getHashingScheme()); break; case SinglePartition: default: messageRouter = new SinglePartitionMessageRouterImpl( - ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions())); + ThreadLocalRandom.current().nextInt(topicMetadata.numPartitions()), conf.getHashingScheme()); } return messageRouter; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index fc5650056a0e2..a7c25c163f8a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -21,16 +21,17 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.TopicMetadata; -public class RoundRobinPartitionMessageRouterImpl implements MessageRouter { +public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase { private static final AtomicIntegerFieldUpdater PARTITION_INDEX_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex"); private volatile int partitionIndex = 0; - public RoundRobinPartitionMessageRouterImpl() { + public RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme hashingScheme) { + super(hashingScheme); PARTITION_INDEX_UPDATER.set(this, 0); } @@ -38,8 +39,9 @@ public RoundRobinPartitionMessageRouterImpl() { public int choosePartition(Message msg, TopicMetadata topicMetadata) { // If the message has a key, it supersedes the round robin routing policy if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % topicMetadata.numPartitions()); + return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions(); } + return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & Integer.MAX_VALUE) % topicMetadata.numPartitions()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java index e1ab87869743d..3a0cd544ba0ce 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java @@ -19,14 +19,15 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.TopicMetadata; -public class SinglePartitionMessageRouterImpl implements MessageRouter { +public class SinglePartitionMessageRouterImpl extends MessageRouterBase { private final int partitionIndex; - public SinglePartitionMessageRouterImpl(int partitionIndex) { + public SinglePartitionMessageRouterImpl(int partitionIndex, ProducerConfiguration.HashingScheme hashingScheme) { + super(hashingScheme); this.partitionIndex = partitionIndex; } @@ -34,7 +35,7 @@ public SinglePartitionMessageRouterImpl(int partitionIndex) { public int choosePartition(Message msg, TopicMetadata metadata) { // If the message has a key, it supersedes the single partition routing policy if (msg.hasKey()) { - return ((msg.getKey().hashCode() & Integer.MAX_VALUE) % metadata.numPartitions()); + return hash.makeHash(msg.getKey()) % metadata.numPartitions(); } return partitionIndex; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java new file mode 100644 index 0000000000000..f53205d47e17e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/HashTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + +public class HashTest { + @Test + public void javaStringHashTest() { + Hash h = JavaStringHash.getInstance(); + + // Calculating `hashCode()` makes overflow as unsigned int32. + String key1 = "keykeykeykeykey1"; + + // `hashCode()` is negative as signed int32. + String key2 = "keykeykey2"; + + // Same value as C++ client + assertEquals(434058482, h.makeHash(key1)); + assertEquals(42978643, h.makeHash(key2)); + } + + @Test + public void murmur3_32HashTest() { + Hash h = Murmur3_32Hash.getInstance(); + + // Same value as C++ client + assertEquals(462881061, h.makeHash("key1")); + assertEquals(1936800180, h.makeHash("key2")); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java index e3cb6c0b2c194..d0e4d510b25d7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.testng.annotations.Test; /** @@ -35,7 +36,7 @@ public void testChoosePartitionWithoutKey() { Message msg = mock(Message.class); when(msg.getKey()).thenReturn(null); - RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(); + RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash); for (int i = 0; i < 10; i++) { assertEquals(i % 5, router.choosePartition(msg, new TopicMetadataImpl(5))); } @@ -52,7 +53,7 @@ public void testChoosePartitionWithKey() { when(msg2.hasKey()).thenReturn(true); when(msg2.getKey()).thenReturn(key2); - RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(); + RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash); TopicMetadataImpl metadata = new TopicMetadataImpl(100); assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata)); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java index 7b5ea4e0f4e8d..c8ea0b011056b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConfiguration; import org.testng.annotations.Test; /** @@ -35,7 +36,7 @@ public void testChoosePartitionWithoutKey() { Message msg = mock(Message.class); when(msg.getKey()).thenReturn(null); - SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234); + SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash); assertEquals(1234, router.choosePartition(msg, new TopicMetadataImpl(2468))); } @@ -50,7 +51,7 @@ public void testChoosePartitionWithKey() { when(msg2.hasKey()).thenReturn(true); when(msg2.getKey()).thenReturn(key2); - SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234); + SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash); TopicMetadataImpl metadata = new TopicMetadataImpl(100); assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));