Skip to content

Commit

Permalink
[FLINK-3422][streaming][api-breaking] Scramble HashPartitioner hashes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Xazax-hun authored and mbalassi committed Mar 2, 2016
1 parent 80ae750 commit 0ff286d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public final void insert(T record) throws IOException {
return;
}

final int hashCode = hash(this.buildSideComparator.hash(record));
final int hashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record));
final int posHashCode = hashCode % this.numBuckets;

// get the bucket for the given hash code
Expand Down Expand Up @@ -360,7 +360,7 @@ public void insertOrReplaceRecord(T record) throws IOException {
return;
}

final int searchHashCode = hash(this.buildSideComparator.hash(record));
final int searchHashCode = MathUtils.jenkinsHash(this.buildSideComparator.hash(record));
final int posHashCode = searchHashCode % this.numBuckets;

// get the bucket for the given hash code
Expand Down Expand Up @@ -1140,26 +1140,7 @@ private void compactPartition(final int partitionNumber) throws IOException {
this.compactionMemory.resetRWViews();
this.compactionMemory.pushDownPages();
}

/**
* This function hashes an integer value. It is adapted from Bob Jenkins' website
* <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
* The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
* affects every bit of the hash value.
*
* @param code The integer to be hashed.
* @return The hash code for the integer.
*/
private static int hash(int code) {
code = (code + 0x7ed55d16) + (code << 12);
code = (code ^ 0xc761c23c) ^ (code >>> 19);
code = (code + 0x165667b1) + (code << 5);
code = (code + 0xd3a2646c) ^ (code << 9);
code = (code + 0xfd7046c5) + (code << 3);
code = (code ^ 0xb55a4f09) ^ (code >>> 16);
return code >= 0 ? code : -(code + 1);
}


/**
* Iterator that traverses the whole hash table once
*
Expand Down Expand Up @@ -1286,7 +1267,7 @@ public T getMatchFor(PT probeSideRecord, T reuse) {
if (closed) {
return null;
}
final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord));

final int posHashCode = searchHashCode % numBuckets;

Expand Down Expand Up @@ -1359,7 +1340,7 @@ public T getMatchFor(PT probeSideRecord) {
if (closed) {
return null;
}
final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
final int searchHashCode = MathUtils.jenkinsHash(this.probeTypeComparator.hash(probeSideRecord));

final int posHashCode = searchHashCode % numBuckets;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.util.MathUtils;

/**
* The output emitter decides to which of the possibly multiple output channels a record is sent.
Expand Down Expand Up @@ -187,39 +188,11 @@ private int[] broadcast(int numberOfChannels) {
private int[] hashPartitionDefault(T record, int numberOfChannels) {
int hash = this.comparator.hash(record);

hash = murmurHash(hash);
this.channels[0] = MathUtils.murmurHash(hash) % numberOfChannels;

if (hash >= 0) {
this.channels[0] = hash % numberOfChannels;
}
else if (hash != Integer.MIN_VALUE) {
this.channels[0] = -hash % numberOfChannels;
}
else {
this.channels[0] = 0;
}

return this.channels;
}

private int murmurHash(int k) {
k *= 0xcc9e2d51;
k = Integer.rotateLeft(k, 15);
k *= 0x1b873593;

k = Integer.rotateLeft(k, 13);
k *= 0xe6546b64;

k ^= 4;
k ^= k >>> 16;
k *= 0x85ebca6b;
k ^= k >>> 13;
k *= 0xc2b2ae35;
k ^= k >>> 16;

return k;
}

private final int[] rangePartition(final T record, int numberOfChannels) {
if (this.channels == null || this.channels.length != 1) {
this.channels = new int[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,64 @@ public static int checkedDownCast(long value) {
public static boolean isPowerOf2(long value) {
return (value & (value - 1)) == 0;
}


/**
* This function hashes an integer value. It is adapted from Bob Jenkins' website
* <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
* The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
* affects every bit of the hash value.
*
* It is crucial to use different hash functions to partition data across machines and the internal partitioning of
* data structures. This hash function is intended for partitioning internally in data structures.
*
* @param code The integer to be hashed.
* @return The non-negative hash code for the integer.
*/
public static int jenkinsHash(int code) {
code = (code + 0x7ed55d16) + (code << 12);
code = (code ^ 0xc761c23c) ^ (code >>> 19);
code = (code + 0x165667b1) + (code << 5);
code = (code + 0xd3a2646c) ^ (code << 9);
code = (code + 0xfd7046c5) + (code << 3);
code = (code ^ 0xb55a4f09) ^ (code >>> 16);
return code >= 0 ? code : -(code + 1);
}

/**
* This function hashes an integer value.
*
* It is crucial to use different hash functions to partition data across machines and the internal partitioning of
* data structures. This hash function is intended for partitioning across machines.
*
* @param code The integer to be hashed.
* @return The non-negative hash code for the integer.
*/
public static int murmurHash(int code) {
code *= 0xcc9e2d51;
code = Integer.rotateLeft(code, 15);
code *= 0x1b873593;

code = Integer.rotateLeft(code, 13);
code *= 0xe6546b64;

code ^= 4;
code ^= code >>> 16;
code *= 0x85ebca6b;
code ^= code >>> 13;
code *= 0xc2b2ae35;
code ^= code >>> 16;

if (code >= 0) {
return code;
}
else if (code != Integer.MIN_VALUE) {
return -code;
}
else {
return 0;
}
}

// ============================================================================================

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/**
Expand Down Expand Up @@ -48,7 +49,7 @@ public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

return returnArray;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeStream;
Expand Down Expand Up @@ -488,9 +489,9 @@ public Integer getKey(Integer value) throws Exception {
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
received++;
if (key == -1) {
key = value % 3;
key = MathUtils.murmurHash(value % 3) % 3;
} else {
assertEquals(key, value % 3);
assertEquals(key, MathUtils.murmurHash(value % 3) % 3);
}
if (value > 0) {
out.collect(value - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala

import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.util.MathUtils
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.test.util.TestBaseUtils
Expand Down Expand Up @@ -71,7 +72,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {

override def run(ctx: SourceContext[(Int, Int)]): Unit = {
0 until numElements foreach {
i => ctx.collect((i % numKeys, i))
i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
}
}

Expand All @@ -86,8 +87,12 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
}
})
.map(new RichMapFunction[Int, (Int, Int)] {
var key: Int = -1
override def map(value: Int): (Int, Int) = {
(getRuntimeContext.getIndexOfThisSubtask, value)
if (key == -1) {
key = MathUtils.murmurHash(value) % numKeys
}
(key, value)
}
})
.split{
Expand All @@ -106,7 +111,7 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
.javaStream
.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)

val groupedSequence = 0 until numElements groupBy( _ % numKeys)
val groupedSequence = 0 until numElements groupBy( MathUtils.murmurHash(_) % numKeys )

expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
Expand Down

0 comments on commit 0ff286d

Please sign in to comment.