Skip to content

Commit

Permalink
[FLINK-3422][streaming] Update tests reliant on hashing
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Mar 2, 2016
1 parent 0ff286d commit f0f93c2
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.TaskIdBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.StreamingProgramTestBase;

/**
* This test relies on the hash function used by the {@link DataStream#keyBy}, which is
* assumed to be {@link MathUtils#murmurHash}.
*/
public class StormFieldsGroupingITCase extends StreamingProgramTestBase {

private final static String topologyId = "FieldsGrouping Test";
Expand All @@ -43,9 +49,9 @@ protected void preSubmit() throws Exception {

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n"
+ "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n"
+ "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath);
compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" +
"3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" +
"4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,15 @@ public String map2(String value) throws Exception {
assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
}

/**
* This test relies on the hash function used by the {@link DataStream#keyBy}, which is
* assumed to be {@link MathUtils#murmurHash}.
*
* For the test to pass all FlatMappers must see at least two records in the iteration,
* which can only be achieved if the hashed values of the input keys map to a complete
* congruence system. Given that the test is designed for 3 parallel FlatMapper instances
* keys chosen from the [1,3] range are a suitable choice.
*/
@Test
public void testGroupByFeedback() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
Expand Down Expand Up @@ -67,12 +68,13 @@ public void after() throws Exception {
* of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
* value. Each group is folded where the second tuple value is summed up.
*
* @throws Exception
* This test relies on the hash function used by the {@link DataStream#keyBy}, which is
* assumed to be {@link MathUtils#murmurHash}.
*/
@Test
public void testFoldOperation() throws Exception {
public void testGroupedFoldOperation() throws Exception {
int numElements = 10;
int numKeys = 2;
final int numKeys = 2;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
Expand All @@ -85,17 +87,20 @@ public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws
return accumulator + value.f1;
}
}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
int key = -1;
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
if (key == -1){
key = MathUtils.murmurHash(value) % numKeys;
}
return new Tuple2<>(key, value);
}
}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
@Override
public Iterable<String> select(Tuple2<Integer, Integer> value) {
List<String> output = new ArrayList<>();

output.add(value.f0 + "");

return output;
}
});
Expand All @@ -120,7 +125,7 @@ public Integer map(Tuple2<Integer, Integer> value) throws Exception {
int counter2 = 0;

for (int i = 0; i < numElements; i++) {
if (i % 2 == 0) {
if (MathUtils.murmurHash(i) % numKeys == 0) {
counter1 += i;
builder1.append(counter1 + "\n");
} else {
Expand Down Expand Up @@ -196,7 +201,7 @@ public NonSerializableTupleSource(int numElements) {
@Override
public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i)));
ctx.collect(new Tuple2<>(i, new NonSerializable(i)));
}
}

Expand All @@ -217,14 +222,13 @@ public TupleSource(int numElements, int numKeys) {
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i);
Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
ctx.collect(result);
}
}

@Override
public void cancel() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
* The stream is grouped by the first field. For each group, the resulting stream is folded by
* summing up the second tuple field.
*
* This test relies on the hash function used by the {@link DataStream#keyBy}, which is
* assumed to be {@link MathUtils#murmurHash}.
*/
@Test
def testFoldOperator(): Unit = {
def testGroupedFoldOperator(): Unit = {
val numElements = 10
val numKeys = 2

Expand Down

0 comments on commit f0f93c2

Please sign in to comment.