Skip to content

Commit

Permalink
[FLINK-8972] [e2eTests] Add DataSetAllroundTestProgram and e2e test s…
Browse files Browse the repository at this point in the history
…cript.
  • Loading branch information
fhueske authored and twalthr committed Mar 26, 2018
1 parent d8a376a commit 5803950
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 0 deletions.
104 changes: 104 additions & 0 deletions flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-end-to-end-tests</artifactId>
<version>1.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-dataset-allround-test</artifactId>
<name>flink-dataset-allround-test</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<!-- DataSetAllroundTestProgram -->
<execution>
<id>DataSetAllroundTestProgram</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>DataSetAllroundTestProgram</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.batch.tests.DataSetAllroundTestProgram</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/batch/tests/DataSetAllroundTestProgram.class</include>
<include>org/apache/flink/batch/tests/DataSetAllroundTestProgram$*.class</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>

<!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-dataset-allround-test-${project.version}-DataSetAllroundTestProgram.jar" tofile="${project.basedir}/target/DataSetAllroundTestProgram.jar" />
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.batch.tests;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* Program to test a large chunk of DataSet API operators and primitives:
* <ul>
* <li>Map, FlatMap, Filter</li>
* <li>GroupReduce, Reduce</li>
* <li>Join</li>
* <li>CoGroup</li>
* <li>BulkIteration</li>
* <li>Different key definitions (position, name, KeySelector)</li>
* </ul>
*
* <p>Program parameters:
* <ul>
* <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
* <li>outputPath (String): path to write the result</li>
* </ul>
*/
public class DataSetAllroundTestProgram {

public static void main(String[] args) throws Exception {

// get parameters
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

int numKeys = loadFactor * 128 * 1024;
DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4);
DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4);

DataSet<Tuple2<String, Integer>> joined = x2Keys
// shift keys (check for correct handling of key positions)
.map(x -> Tuple4.of("0-0", 0L, 1, x.f0))
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT, Types.STRING))
// join datasets on non-unique fields (m-n join)
// Result: (key, 1) 16 * #keys records, all keys are preserved
.join(x8Keys).where(3).equalTo(0).with((l, r) -> Tuple2.of(l.f3, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
// key definition with key selector function
.groupBy(
new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
)
// reduce
// Result: (key, cnt), #keys records with unique keys, cnt = 16
.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1));

// co-group two datasets on their primary keys.
// we filter both inputs such that only 6.25% of the keys overlap.
// result: (key, cnt), #keys records with unique keys, cnt = (6.25%: 2, 93.75%: 1)
DataSet<Tuple2<String, Integer>> coGrouped = x1Keys
.filter(x -> x.f1 > 59)
.coGroup(x1Keys.filter(x -> x.f1 < 68)).where("f0").equalTo("f0").with(
(CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>)
(l, r, out) -> {
int cnt = 0;
String key = "";
for (Tuple2<String, Integer> t : l) {
cnt++;
key = t.f0;
}
for (Tuple2<String, Integer> t : r) {
cnt++;
key = t.f0;
}
out.collect(Tuple2.of(key, cnt));
}
)
.returns(Types.TUPLE(Types.STRING, Types.INT));

// join datasets on keys (1-1 join) and replicate by 16 (previously computed count)
// result: (key, cnt), 16 * #keys records, all keys preserved, cnt = (6.25%: 2, 93.75%: 1)
DataSet<Tuple2<String, Integer>> joined2 = joined.join(coGrouped, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
.where(0).equalTo("f0")
.flatMap(
(FlatMapFunction<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>>)
(p, out) -> {
for (int i = 0; i < p.f0.f1; i++) {
out.collect(Tuple2.of(p.f0.f0, p.f1.f1));
}
}
)
.returns(Types.TUPLE(Types.STRING, Types.INT));

// iteration. double the count field until all counts are at 32 or more
// result: (key, cnt), 16 * #keys records, all keys preserved, cnt = (6.25%: 64, 93.75%: 32)
IterativeDataSet<Tuple2<String, Integer>> initial = joined2.iterate(16);
DataSet<Tuple2<String, Integer>> iteration = initial
.map(x -> Tuple2.of(x.f0, x.f1 * 2))
.returns(Types.TUPLE(Types.STRING, Types.INT));
DataSet<Boolean> termination = iteration
// stop iteration if all values are larger/equal 32
.flatMap(
(FlatMapFunction<Tuple2<String, Integer>, Boolean>)
(x, out) -> {
if (x.f1 < 32) {
out.collect(false);
}
}
)
.returns(Types.BOOLEAN);
DataSet result = initial.closeWith(iteration, termination)
// group on the count field and count records
// result: two records: (32, cnt1) and (64, cnt2) where cnt1 = x * 15/16, cnt2 = x * 1/16
.groupBy(1)
.reduceGroup(
(GroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>>)
(g, out) -> {
int key = 0;
int cnt = 0;
for (Tuple2<String, Integer> r : g) {
key = r.f1;
cnt++;
}
out.collect(Tuple2.of(key, cnt));
}
)
.returns(Types.TUPLE(Types.INT, Types.INT))
// normalize result by load factor
// result: two records: (32: 15360) and (64, 1024). (x = 16384)
.map(x -> Tuple2.of(x.f0, x.f1 / (loadFactor * 128)))
.returns(Types.TUPLE(Types.INT, Types.INT));

// sort and emit result
result
.sortPartition(0, Order.ASCENDING).setParallelism(1)
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE).setParallelism(1);

env.execute();

}

/**
* InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
* <ul>
* <li>String: key, can be repeated.</li>
* <li>Integer: uniformly distributed int between 0 and 127</li>
* </ul>
*/
public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> {

// total number of records
private final long numRecords;
// total number of keys
private final long numKeys;

// records emitted per partition
private long recordsPerPartition;
// number of keys per partition
private long keysPerPartition;

// number of currently emitted records
private long recordCnt;

// id of current partition
private int partitionId;
// total number of partitions
private int numPartitions;

public Generator(long numKeys, int recordsPerKey) {
this.numKeys = numKeys;
this.numRecords = numKeys * recordsPerKey;
}

@Override
public void configure(Configuration parameters) { }

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return null;
}

@Override
public GenericInputSplit[] createInputSplits(int minNumSplits) throws IOException {

GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
for (int i = 0; i < minNumSplits; i++) {
splits[i] = new GenericInputSplit(i, minNumSplits);
}
return splits;
}

@Override
public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}

@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionId = split.getSplitNumber();
this.numPartitions = split.getTotalNumberOfSplits();

// ensure even distribution of records and keys
Preconditions.checkArgument(
numRecords % numPartitions == 0,
"Records cannot be evenly distributed among partitions");
Preconditions.checkArgument(
numKeys % numPartitions == 0,
"Keys cannot be evenly distributed among partitions");

this.recordsPerPartition = numRecords / numPartitions;
this.keysPerPartition = numKeys / numPartitions;

this.recordCnt = 0;
}

@Override
public boolean reachedEnd() throws IOException {
return this.recordCnt >= this.recordsPerPartition;
}

@Override
public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException {

// build key from partition id and count per partition
String key = String.format(
"%d-%d",
this.partitionId,
this.recordCnt % this.keysPerPartition);
// 128 values to filter on
int filterVal = (int) this.recordCnt % 128;

this.recordCnt++;

reuse.f0 = key;
reuse.f1 = filterVal;
return reuse;
}

@Override
public void close() throws IOException { }
}

}
1 change: 1 addition & 0 deletions flink-end-to-end-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ under the License.

<modules>
<module>flink-parent-child-classloading-test</module>
<module>flink-dataset-allround-test</module>
</modules>

</project>
Loading

0 comments on commit 5803950

Please sign in to comment.