Skip to content

Commit

Permalink
Improvements in consistent hashing (qubole#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmandeep Singh authored Sep 11, 2020
1 parent acd7608 commit 7e797a5
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
Expand Down Expand Up @@ -258,15 +257,9 @@ public CacheStatusResponse getCacheStatus(CacheStatusRequest request) throws TEx
return null;
}

ClusterManager.ClusterInfo clusterInfo = clusterManager.getClusterInfo();
List<String> nodes = clusterInfo.getNodes();
int currentNodeIndex = clusterInfo.getCurrentNodeIndex();
if (currentNodeIndex == -1 || nodes == null) {
log.error("Initialization not done for Cluster Type" + ClusterType.findByValue(request.getClusterType()));
return null;
}
String currentNodeName = clusterManager.getCurrentNodeName();

Map<Long, Integer> blockSplits = new HashMap<>();
Map<Long, String> blockSplits = new HashMap<>();
long blockNumber = 0;

long fileLength = request.getFileLength();
Expand All @@ -282,8 +275,8 @@ public CacheStatusResponse getCacheStatus(CacheStatusRequest request) throws TEx
end = fileLength;
}
String key = remotePath + i + end;
int nodeIndex = clusterManager.getNodeIndex(nodes.size(), key);
blockSplits.put(blockNumber, nodeIndex);
String nodeAddress = clusterManager.locateKey(key);
blockSplits.put(blockNumber, nodeAddress);
blockNumber++;
}

Expand Down Expand Up @@ -330,17 +323,17 @@ public CacheStatusResponse getCacheStatus(CacheStatusRequest request) throws TEx
totalRequests++;

long split = (blockNum * blockSize) / splitSize;
if (!blockSplits.get(split).equals(currentNodeIndex)) {
blockLocations.add(new BlockLocation(Location.NON_LOCAL, nodes.get(blockSplits.get(split))));
if (!blockSplits.get(split).equals(currentNodeName)) {
blockLocations.add(new BlockLocation(Location.NON_LOCAL, blockSplits.get(split)));
nonLocalRequests++;
}
else {
if (md.isBlockCached(blockNum)) {
blockLocations.add(new BlockLocation(Location.CACHED, nodes.get(blockSplits.get(split))));
blockLocations.add(new BlockLocation(Location.CACHED, blockSplits.get(split)));
cacheRequests++;
}
else {
blockLocations.add(new BlockLocation(Location.LOCAL, nodes.get(blockSplits.get(split))));
blockLocations.add(new BlockLocation(Location.LOCAL, blockSplits.get(split)));
remoteRequests++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
Expand Down Expand Up @@ -422,9 +421,9 @@ else if (start >= 0L && len >= 0L) {
end = file.getLen();
}
String key = file.getPath().toString() + i + end;
int nodeIndex = clusterManager.getNodeIndex(nodes.size(), key);
String[] name = new String[]{nodes.get(nodeIndex)};
String[] host = new String[]{nodes.get(nodeIndex)};
String nodeName = clusterManager.locateKey(key);
String[] name = new String[]{nodeName};
String[] host = new String[]{nodeName};
blockLocations[blockNumber++] = new BlockLocation(name, host, i, end - i);
log.debug(String.format("BlockLocation %s %d %d %s totalHosts: %s", file.getPath().toString(), i, end - i, host[0], nodes.size()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public ClusterType getClusterType()
}

@Override
public ClusterInfo getClusterInfo()
public String getCurrentNodeName()
{
return new ClusterInfo(getNodes(), 0);
return getNodes().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,15 @@ public ClusterType getClusterType()
{
return ClusterType.TEST_CLUSTER_MANAGER;
}

@Override
public String getCurrentNodeName()
{
return getNodes().get(0);
}

public String locateKey(String key)
{
return getNodes().get(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,12 @@ public void testUnhealthyNodeCluster_unhealthy()
}

@Test
public void testClusterIndex()
public void testClusterNodeName()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneNew(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
ClusterManager manager = TestHadoop2ClusterManagerUtil.getClusterManagerInstance(ClusterType.HADOOP2_CLUSTER_MANAGER, conf);
int index = manager.getNodeIndex(nodeHostnames.size(), "1");
String nodeName = TestHadoop2ClusterManagerUtil.getConsistentHashedNodeNameFromCluster(TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneNew(), "1", conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(index == 1, "Consistent Hasing logic returned wrong node index");
assertTrue(nodeName.equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1), "Consistent hashing logic returned wrong node: " + nodeName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler res
return nodes;
}

static int getConsistentHashedNodeIndexFromCluster(String endpoint, HttpHandler responseHandler, String key,
static String getConsistentHashedNodeNameFromCluster(String endpoint, HttpHandler responseHandler, String key,
Configuration conf, ClusterType clusterType)
throws IOException
{
Expand All @@ -109,11 +109,11 @@ static int getConsistentHashedNodeIndexFromCluster(String endpoint, HttpHandler

ClusterManager clusterManager = getClusterManagerInstance(clusterType, conf);
clusterManager.initialize(conf);
final List<String> nodes = clusterManager.getNodes();
final int index = clusterManager.getNodeIndex(nodes.size(), key);

clusterManager.getNodes();
String nodeName = clusterManager.locateKey(key);
server.stop(0);
return index;

return nodeName;
}

static Set<String> generateRandomKeys(int numKeys)
Expand All @@ -139,15 +139,15 @@ static String getSaltString()
return saltStr;
}

static Map<String, Integer> getConsistentHashedMembership(TestWorker worker, Set<String> keys,
static Map<String, String> getConsistentHashedMembership(TestWorker worker, Set<String> keys,
Configuration conf, ClusterType clusterType)
throws IOException
{
Map<String, Integer> keyMembership = new HashMap<>();
int nodeIndex = 0;
Map<String, String> keyMembership = new HashMap<>();
String nodeIndex;

for (String key : keys) {
nodeIndex = getConsistentHashedNodeIndexFromCluster(CLUSTER_NODES_ENDPOINT, worker, key, conf, clusterType);
nodeIndex = getConsistentHashedNodeNameFromCluster(CLUSTER_NODES_ENDPOINT, worker, key, conf, clusterType);
keyMembership.put(key, nodeIndex);
}
return keyMembership;
Expand All @@ -158,16 +158,16 @@ static int matchMemberships(TestWorker prevWorker, TestWorker newWorker, Set<Str
throws IOException
{
final List<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
Map<String, Integer> keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType);
Map<String, String> keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType);

final List<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
Map<String, Integer> keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType);
Map<String, String> keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType);

int match = 0;
int nonMatch = 0;

for (String key : keys) {
if (nodeHostnames1.get(keyMembership1.get(key)).equals(nodeHostnames2.get(keyMembership2.get(key)))) {
if (keyMembership1.get(key).equals(keyMembership2.get(key))) {
match++;
}
else {
Expand Down
8 changes: 8 additions & 0 deletions rubix-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<version>${dep.mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.ishugaliy</groupId>
<artifactId>allgood-consistent-hash</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -106,6 +112,8 @@
<includes>
<include>org.apache.thrift:libthrift</include>
<include>com.google.guava:guava</include>
<include>com.github.ishugaliy:allgood-consistent-hash</include>
<include>net.openhft:zero-allocation-hashing</include>
</includes>
</artifactSet>
<relocations>
Expand Down
97 changes: 48 additions & 49 deletions rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,23 @@
*/
package com.qubole.rubix.spi;

import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.ishugaliy.allgood.consistent.hash.ConsistentHash;
import org.ishugaliy.allgood.consistent.hash.HashRing;
import org.ishugaliy.allgood.consistent.hash.hasher.DefaultHasher;
import org.ishugaliy.allgood.consistent.hash.node.SimpleNode;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -53,10 +46,14 @@ public abstract class ClusterManager
{
private static Log log = LogFactory.getLog(ClusterManager.class);

private int currentNodeIndex = -1;
private String currentNodeName;
private String nodeHostname;
private String nodeHostAddress;
private final AtomicReference<LoadingCache<String, List<String>>> nodesCache = new AtomicReference<>();
// Concluded from testing that Metro Hash results in better load distribution across the nodes in cluster.
private final ConsistentHash<SimpleNode> consistentHashRing = HashRing.<SimpleNode>newBuilder()
.hasher(DefaultHasher.METRO_HASH)
.build();

public abstract ClusterType getClusterType();

Expand Down Expand Up @@ -87,17 +84,37 @@ private List<String> getNodesAndUpdateState()
// Empty result set => server up and only master node running, return localhost has the only node
// Do not need to consider failed nodes list as 1node cluster and server is up since it replied to allNodesRequest
nodes = ImmutableList.of(getCurrentNodeHostAddress());
} else {
Collections.sort(nodes);
}

currentNodeIndex = nodes.indexOf(getCurrentNodeHostname());
if (currentNodeIndex == -1) {
currentNodeIndex = nodes.indexOf(getCurrentNodeHostAddress());
// remove stale nodes from consistent hash ring
for (SimpleNode ringNode : consistentHashRing.getNodes()) {
if (!nodes.contains(ringNode.getKey()))
{
log.debug("Removing node: " + ringNode.getKey() + " from consistent hash ring, Total nodes: " + consistentHashRing.getNodes());
consistentHashRing.remove(ringNode);
}
}
if (currentNodeIndex == -1) {
log.error(String.format("Could not initialize cluster nodes=%s nodeHostName=%s nodeHostAddress=%s " +
"currentNodeIndex=%d", nodes, getCurrentNodeHostname(), getCurrentNodeHostAddress(), currentNodeIndex));

// add new nodes to consistent hash ring
for (String node : nodes) {
SimpleNode ringNode = SimpleNode.of(node);
if (!consistentHashRing.contains(ringNode)) {
log.debug("Adding node: " + ringNode.getKey() + " to consistent hash ring, Total nodes: " + consistentHashRing.getNodes());
consistentHashRing.add(ringNode);
}
}

if (currentNodeName == null) {
if (consistentHashRing.contains(SimpleNode.of(getCurrentNodeHostname()))) {
currentNodeName = getCurrentNodeHostname();
}
else if (consistentHashRing.contains(SimpleNode.of(getCurrentNodeHostAddress()))) {
currentNodeName = getCurrentNodeHostAddress();
}
else {
log.error(String.format("Could not initialize cluster nodes=%s nodeHostName=%s nodeHostAddress=%s " +
"currentNodeIndex=%s", nodes, getCurrentNodeHostname(), getCurrentNodeHostAddress(), currentNodeName));
}
}
return nodes;
}
Expand Down Expand Up @@ -136,11 +153,9 @@ public List<String> load(String s)
}
}

public int getNodeIndex(int numNodes, String key)
public String locateKey(String key)
{
HashFunction hf = Hashing.md5();
HashCode hc = hf.hashString(key, Charsets.UTF_8);
return Hashing.consistentHash(hc, numNodes);
return consistentHashRing.locate(key).orElseThrow(() -> new RuntimeException("Unable to locate key: " + key)).getKey();
}

// Returns sorted list of nodes in the cluster
Expand All @@ -149,32 +164,16 @@ public List<String> getNodes()
return nodesCache.get().getUnchecked("nodes");
}

public ClusterInfo getClusterInfo()
public String getCurrentNodeName()
{
// getNodes() updates the currentNodeIndex
List<String> nodes = getNodes();
return new ClusterInfo(nodes, currentNodeIndex);
}

public static class ClusterInfo
{
private final List<String> nodes;
private final int currentNodeIndex;

public ClusterInfo(List<String> nodes, int currentNodeIndex)
{
this.nodes = nodes;
this.currentNodeIndex = currentNodeIndex;
}

public List<String> getNodes()
{
return nodes;
}

public int getCurrentNodeIndex()
{
return currentNodeIndex;
if (currentNodeName == null) {
// getNodes() updates the currentNodeName
List<String> nodes = getNodes();
if (nodes == null) {
log.error("Initialization not done for Cluster Type: " + getClusterType());
throw new RuntimeException("Unable to find current node name");
}
}
return currentNodeName;
}
}

0 comments on commit 7e797a5

Please sign in to comment.