Skip to content

Commit

Permalink
HDFS-6268. Better sorting in NetworkTopology#pseudoSortByDistance whe…
Browse files Browse the repository at this point in the history
…n no local node is found. (wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1599734 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
umbrant committed Jun 3, 2014
1 parent 0634b42 commit 02fcb6b
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -32,6 +34,9 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.util.ReflectionUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

/** The class represents a cluster of computer with a tree hierarchical
* network topology.
* For example, a cluster may be consists of many data centers filled
Expand Down Expand Up @@ -667,7 +672,23 @@ protected boolean isSameParents(Node node1, Node node2) {
return node1.getParent()==node2.getParent();
}

final protected static Random r = new Random();
private static final ThreadLocal<Random> r = new ThreadLocal<Random>();

/**
* Getter for thread-local Random, which provides better performance than
* a shared Random (even though Random is thread-safe).
*
* @return Thread-local Random.
*/
protected Random getRandom() {
Random rand = r.get();
if (rand == null) {
rand = new Random();
r.set(rand);
}
return rand;
}

/** randomly choose one node from <i>scope</i>
* if scope starts with ~, choose one from the all nodes except for the
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
Expand Down Expand Up @@ -717,7 +738,7 @@ private Node chooseRandom(String scope, String excludedScope){
"Failed to find datanode (scope=\"" + String.valueOf(scope) +
"\" excludedScope=\"" + String.valueOf(excludedScope) + "\").");
}
int leaveIndex = r.nextInt(numOfDatanodes);
int leaveIndex = getRandom().nextInt(numOfDatanodes);
return innerNode.getLeaf(leaveIndex, node);
}

Expand Down Expand Up @@ -824,61 +845,79 @@ public static String getLastHalf(String networkLocation) {
return networkLocation.substring(index);
}

/** swap two array items */
static protected void swap(Node[] nodes, int i, int j) {
Node tempNode;
tempNode = nodes[j];
nodes[j] = nodes[i];
nodes[i] = tempNode;
}

/** Sort nodes array by their distances to <i>reader</i>
* It linearly scans the array, if a local node is found, swap it with
* the first element of the array.
* If a local rack node is found, swap it with the first element following
* the local node.
* If neither local node or local rack node is found, put a random replica
* location at position 0.
* It leaves the rest nodes untouched.
* @param reader the node that wishes to read a block from one of the nodes
* @param nodes the list of nodes containing data for the reader
/**
* Returns an integer weight which specifies how far away {node} is away from
* {reader}. A lower value signifies that a node is closer.
*
* @param reader Node where data will be read
* @param node Replica of data
* @return weight
*/
public void pseudoSortByDistance( Node reader, Node[] nodes ) {
int tempIndex = 0;
int localRackNode = -1;
if (reader != null ) {
//scan the array to find the local node & local rack node
for(int i=0; i<nodes.length; i++) {
if(tempIndex == 0 && reader == nodes[i]) { //local node
//swap the local node and the node at position 0
if( i != 0 ) {
swap(nodes, tempIndex, i);
}
tempIndex=1;
if(localRackNode != -1 ) {
if(localRackNode == 0) {
localRackNode = i;
}
break;
}
} else if(localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
//local rack
localRackNode = i;
if(tempIndex != 0 ) break;
}
protected int getWeight(Node reader, Node node) {
// 0 is local, 1 is same rack, 2 is off rack
// Start off by initializing to off rack
int weight = 2;
if (reader != null) {
if (reader == node) {
weight = 0;
} else if (isOnSameRack(reader, node)) {
weight = 1;
}
}
return weight;
}

// swap the local rack node and the node at position tempIndex
if(localRackNode != -1 && localRackNode != tempIndex ) {
swap(nodes, tempIndex, localRackNode);
tempIndex++;
/**
* Sort nodes array by network distance to <i>reader</i>.
* <p/>
* In a three-level topology, a node can be either local, on the same rack, or
* on a different rack from the reader. Sorting the nodes based on network
* distance from the reader reduces network traffic and improves performance.
* <p/>
* As an additional twist, we also randomize the nodes at each network
* distance using the provided random seed. This helps with load balancing
* when there is data skew.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param seed Used to seed the pseudo-random generator that randomizes the
* set of nodes at each network distance.
*/
public void sortByDistance(Node reader, Node[] nodes, long seed) {
/** Sort weights for the nodes array */
int[] weights = new int[nodes.length];
for (int i=0; i<nodes.length; i++) {
weights[i] = getWeight(reader, nodes[i]);
}
// Add weight/node pairs to a TreeMap to sort
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
for (int i=0; i<nodes.length; i++) {
int weight = weights[i];
Node node = nodes[i];
List<Node> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(node);
}

// put a random node at position 0 if it is not a local/local-rack node
if(tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
swap(nodes, 0, r.nextInt(nodes.length));

// Seed is normally the block id
// This means we use the same pseudo-random order for each block, for
// potentially better page cache usage.
Random rand = getRandom();
rand.setSeed(seed);
int idx = 0;
for (List<Node> list: tree.values()) {
if (list != null) {
Collections.shuffle(list, rand);
for (Node n: list) {
nodes[idx] = n;
idx++;
}
}
}
Preconditions.checkState(idx == nodes.length,
"Sorted the wrong number of nodes!");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -248,25 +248,41 @@ public void remove(Node node) {
}
}

/** Sort nodes array by their distances to <i>reader</i>
* It linearly scans the array, if a local node is found, swap it with
* the first element of the array.
* If a local node group node is found, swap it with the first element
* following the local node.
* If a local rack node is found, swap it with the first element following
* the local node group node.
* If neither local node, node group node or local rack node is found, put a
* random replica location at position 0.
* It leaves the rest nodes untouched.
* @param reader the node that wishes to read a block from one of the nodes
* @param nodes the list of nodes containing data for the reader
*/
@Override
public void pseudoSortByDistance( Node reader, Node[] nodes ) {
protected int getWeight(Node reader, Node node) {
// 0 is local, 1 is same node group, 2 is same rack, 3 is off rack
// Start off by initializing to off rack
int weight = 3;
if (reader != null) {
if (reader == node) {
weight = 0;
} else if (isOnSameNodeGroup(reader, node)) {
weight = 1;
} else if (isOnSameRack(reader, node)) {
weight = 2;
}
}
return weight;
}

/**
* Sort nodes array by their distances to <i>reader</i>.
* <p/>
* This is the same as
* {@link NetworkTopology#sortByDistance(Node, Node[], long)} except with a
* four-level network topology which contains the additional network distance
* of a "node group" which is between local and same rack.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param seed Used to seed the pseudo-random generator that randomizes the
* set of nodes at each network distance.
*/
@Override
public void sortByDistance( Node reader, Node[] nodes, long seed) {
// If reader is not a datanode (not in NetworkTopology tree), we need to
// replace this reader with a sibling leaf node in tree.
if (reader != null && !this.contains(reader)) {
// if reader is not a datanode (not in NetworkTopology tree), we will
// replace this reader with a sibling leaf node in tree.
Node nodeGroup = getNode(reader.getNetworkLocation());
if (nodeGroup != null && nodeGroup instanceof InnerNode) {
InnerNode parentNode = (InnerNode) nodeGroup;
Expand All @@ -276,62 +292,7 @@ public void pseudoSortByDistance( Node reader, Node[] nodes ) {
return;
}
}
int tempIndex = 0;
int localRackNode = -1;
int localNodeGroupNode = -1;
if (reader != null) {
//scan the array to find the local node & local rack node
for (int i = 0; i < nodes.length; i++) {
if (tempIndex == 0 && reader == nodes[i]) { //local node
//swap the local node and the node at position 0
if (i != 0) {
swap(nodes, tempIndex, i);
}
tempIndex=1;

if (localRackNode != -1 && (localNodeGroupNode !=-1)) {
if (localRackNode == 0) {
localRackNode = i;
}
if (localNodeGroupNode == 0) {
localNodeGroupNode = i;
}
break;
}
} else if (localNodeGroupNode == -1 && isOnSameNodeGroup(reader,
nodes[i])) {
//local node group
localNodeGroupNode = i;
// node local and rack local are already found
if(tempIndex != 0 && localRackNode != -1) break;
} else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
localRackNode = i;
if (tempIndex != 0 && localNodeGroupNode != -1) break;
}
}

// swap the local nodegroup node and the node at position tempIndex
if(localNodeGroupNode != -1 && localNodeGroupNode != tempIndex) {
swap(nodes, tempIndex, localNodeGroupNode);
if (localRackNode == tempIndex) {
localRackNode = localNodeGroupNode;
}
tempIndex++;
}

// swap the local rack node and the node at position tempIndex
if(localRackNode != -1 && localRackNode != tempIndex) {
swap(nodes, tempIndex, localRackNode);
tempIndex++;
}
}

// put a random node at position 0 if there is not a local/local-nodegroup/
// local-rack node
if (tempIndex == 0 && localNodeGroupNode == -1 && localRackNode == -1
&& nodes.length != 0) {
swap(nodes, 0, r.nextInt(nodes.length));
}
super.sortByDistance(reader, nodes, seed);
}

/** InnerNodeWithNodeGroup represents a switch/router of a data center, rack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public void testGetDistance() throws Exception {
}

@Test
public void testPseudoSortByDistance() throws Exception {
public void testSortByDistance() throws Exception {
NodeBase[] testNodes = new NodeBase[4];

// array contains both local node, local node group & local rack node
testNodes[0] = dataNodes[1];
testNodes[1] = dataNodes[2];
testNodes[2] = dataNodes[3];
testNodes[3] = dataNodes[0];
cluster.pseudoSortByDistance(dataNodes[0], testNodes );
cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
Expand All @@ -115,7 +115,7 @@ public void testPseudoSortByDistance() throws Exception {
testNodes[1] = dataNodes[4];
testNodes[2] = dataNodes[1];
testNodes[3] = dataNodes[0];
cluster.pseudoSortByDistance(dataNodes[0], testNodes );
cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);

Expand All @@ -124,7 +124,7 @@ public void testPseudoSortByDistance() throws Exception {
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.pseudoSortByDistance(dataNodes[0], testNodes );
cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);

Expand All @@ -133,7 +133,7 @@ public void testPseudoSortByDistance() throws Exception {
testNodes[1] = dataNodes[7];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.pseudoSortByDistance(computeNode, testNodes );
cluster.sortByDistance(computeNode, testNodes, 0xDEADBEEF);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
}
Expand Down
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6109 let sync_file_range() system call run in background
(Liang Xie via stack)

HDFS-6268. Better sorting in NetworkTopology#pseudoSortByDistance when
no local node is found. (wang)

OPTIMIZATIONS

HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ public void sortLocatedBlocks(final String targethost,
DFSUtil.DECOM_COMPARATOR;

for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
networktopology.sortByDistance(client, b.getLocations(), b
.getBlock().getBlockId());
// Move decommissioned/stale datanodes to the bottom
Arrays.sort(b.getLocations(), comparator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,9 +1633,11 @@ LocatedBlocks getBlockLocations(String clientMachine, String src,
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());

// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
ArrayList<LocatedBlock> lastBlockList =
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public void testReadSelectNonStaleDatanode() throws Exception {
if (stm != null) {
stm.close();
}
if (client != null) {
client.close();
}
cluster.shutdown();
}
}
Expand Down
Loading

0 comments on commit 02fcb6b

Please sign in to comment.