Skip to content

Commit

Permalink
Fix MessageRouter hash inconsistent on C++/Java client (apache#1029)
Browse files Browse the repository at this point in the history
* Fix hash inconsistent on between C++ and Java clients.

* Add HashingScheme to select hash function on Java client

* Fix the bug of Murmur3_32Hash on C++ client

* Add Javadoc on makeHash method

* Use JavaStringHash as default hash on Java client

* Use BoostHash as default hash on C++ client

* Make hash method always returns a signed integer

* Re-implement hash classes as singleton on Java client

* Move hash classes from include to lib

* Change constructor argument of hash classes

* Remove unused headers

* Remove `auto` type

* Fix C++ client Hash classes so that these return non-negative signed integer
This is the same behavior as Hash classes on Java client

* Add tests for C++/Java client Hash
  • Loading branch information
Licht-T authored and merlimat committed Jan 31, 2018
1 parent e783a58 commit 8d159ef
Show file tree
Hide file tree
Showing 31 changed files with 832 additions and 40 deletions.
9 changes: 9 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class ProducerConfiguration {
RoundRobinDistribution,
CustomPartition
};
enum HashingScheme
{
Murmur3_32Hash,
BoostHash,
JavaStringHash
};
ProducerConfiguration();
~ProducerConfiguration();
ProducerConfiguration(const ProducerConfiguration&);
Expand Down Expand Up @@ -86,6 +92,9 @@ class ProducerConfiguration {
ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router);
const MessageRoutingPolicyPtr& getMessageRouterPtr() const;

ProducerConfiguration& setHashingScheme(const HashingScheme& scheme);
HashingScheme getHashingScheme() const;

ProducerConfiguration& setBlockIfQueueFull(bool);
bool getBlockIfQueueFull() const;

Expand Down
29 changes: 29 additions & 0 deletions pulsar-client-cpp/lib/BoostHash.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "BoostHash.h"

namespace pulsar {

BoostHash::BoostHash() : hash() {}

int32_t BoostHash::makeHash(const std::string& key) {
return static_cast<int32_t>(hash(key) & std::numeric_limits<int32_t>::max());
}

} // namespace pulsar
39 changes: 39 additions & 0 deletions pulsar-client-cpp/lib/BoostHash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef BOOST_HASH_HPP_
#define BOOST_HASH_HPP_

#include "Hash.h"

#include <cstdint>
#include <string>
#include <boost/functional/hash.hpp>

namespace pulsar {
class BoostHash : public Hash {
public:
BoostHash();
int32_t makeHash(const std::string &key);

private:
boost::hash<std::string> hash;
};
} // namespace pulsar

#endif /* BOOST_HASH_HPP_ */
37 changes: 37 additions & 0 deletions pulsar-client-cpp/lib/Hash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef HASH_HPP_
#define HASH_HPP_

#include <cstdint>
#include <string>

namespace pulsar {
class Hash {
public:
/**
* Generate the hash of a given String
*
* @return The hash of {@param key}, which is non-negative integer.
*/
virtual int32_t makeHash(const std::string& key) = 0;
};
} // namespace pulsar

#endif /* HASH_HPP_ */
39 changes: 39 additions & 0 deletions pulsar-client-cpp/lib/JavaStringHash.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "JavaStringHash.h"

namespace pulsar {

JavaStringHash::JavaStringHash() {}

int32_t JavaStringHash::makeHash(const std::string& key) {
uint64_t len = key.length();
const char* val = key.c_str();
uint32_t hash = 0;

for (int i = 0; i < len; i++) {
hash = 31 * hash + val[i];
}

hash &= std::numeric_limits<int32_t>::max();

return hash;
}

} // namespace pulsar
36 changes: 36 additions & 0 deletions pulsar-client-cpp/lib/JavaStringHash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef JAVA_DEFAULT_HASH_HPP_
#define JAVA_DEFAULT_HASH_HPP_

#include "Hash.h"

#include <cstdint>
#include <string>
#include <boost/functional/hash.hpp>

namespace pulsar {
class JavaStringHash : public Hash {
public:
JavaStringHash();
int32_t makeHash(const std::string &key);
};
} // namespace pulsar

#endif /* JAVA_DEFAULT_HASH_HPP_ */
40 changes: 40 additions & 0 deletions pulsar-client-cpp/lib/MessageRouterBase.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "MessageRouterBase.h"

#include "BoostHash.h"
#include "JavaStringHash.h"
#include "Murmur3_32Hash.h"

namespace pulsar {
MessageRouterBase::MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme) {
switch (hashingScheme) {
case ProducerConfiguration::BoostHash:
hash = HashPtr(new BoostHash());
break;
case ProducerConfiguration::JavaStringHash:
hash = HashPtr(new JavaStringHash());
break;
case ProducerConfiguration::Murmur3_32Hash:
default:
hash = HashPtr(new Murmur3_32Hash());
break;
}
}
} // namespace pulsar
41 changes: 41 additions & 0 deletions pulsar-client-cpp/lib/MessageRouterBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_CPP_MESSAGEROUTERBASE_H
#define PULSAR_CPP_MESSAGEROUTERBASE_H

#include <boost/interprocess/smart_ptr/unique_ptr.hpp>
#include <boost/checked_delete.hpp>

#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/ProducerConfiguration.h>
#include "Hash.h"

namespace pulsar {
typedef boost::interprocess::unique_ptr<Hash, boost::checked_deleter<Hash> > HashPtr;

class MessageRouterBase : public MessageRoutingPolicy {
public:
MessageRouterBase(ProducerConfiguration::HashingScheme hashingScheme);

protected:
HashPtr hash;
};
} // namespace pulsar

#endif // PULSAR_CPP_MESSAGEROUTERBASE_H
Loading

0 comments on commit 8d159ef

Please sign in to comment.