Skip to content

Commit

Permalink
[FLINK-3656] [table] Consolidate ITCases
Browse files Browse the repository at this point in the history
Merge FilterIT/SelectIT to CalcITCases
Merge FromDataSet/ToTable to TableEnvironmentITCases
Merge aggregating ITCases
All batch ITCases use TableProgramsTestBase

This closes apache#2566.
  • Loading branch information
twalthr committed Sep 29, 2016
1 parent 8243138 commit 7758571
Show file tree
Hide file tree
Showing 26 changed files with 1,716 additions and 2,133 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,40 @@
*/
package org.apache.flink.api.java.batch.table;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.TableEnvironment;
import org.apache.flink.api.table.ValidationException;
import org.apache.flink.examples.java.WordCountTable.WC;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.flink.examples.java.WordCountTable.WC;

import java.util.List;

@RunWith(Parameterized.class)
public class AggregationsITCase extends MultipleProgramsTestBase {
public class AggregationsITCase extends TableProgramsTestBase {

public AggregationsITCase(TestExecutionMode mode){
super(mode);
public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
super(mode, configMode);
}

@Test
public void testAggregationTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));

Expand All @@ -62,7 +65,7 @@ public void testAggregationTypes() throws Exception {
@Test(expected = ValidationException.class)
public void testAggregationOnNonExistingField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

Table table =
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
Expand All @@ -79,7 +82,7 @@ public void testAggregationOnNonExistingField() throws Exception {
@Test
public void testWorkingAggregationDataTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
env.fromElements(
Expand All @@ -100,7 +103,7 @@ public void testWorkingAggregationDataTypes() throws Exception {
@Test
public void testAggregationWithArithmetic() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSource<Tuple2<Float, String>> input =
env.fromElements(
Expand All @@ -122,7 +125,7 @@ public void testAggregationWithArithmetic() throws Exception {
@Test
public void testAggregationWithTwoCount() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSource<Tuple2<Float, String>> input =
env.fromElements(
Expand All @@ -144,7 +147,7 @@ public void testAggregationWithTwoCount() throws Exception {
@Test(expected = ValidationException.class)
public void testNonWorkingDataTypes() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));

Expand All @@ -164,7 +167,7 @@ public void testNonWorkingDataTypes() throws Exception {
@Test(expected = ValidationException.class)
public void testNoNestedAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));

Expand All @@ -181,10 +184,90 @@ public void testNoNestedAggregation() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = ValidationException.class)
public void testGroupingOnNonExistentField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

tableEnv
.fromDataSet(input, "a, b, c")
// must fail. Field foo is not in input
.groupBy("foo")
.select("a.avg");
}

@Test(expected = ValidationException.class)
public void testGroupingInvalidSelection() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

tableEnv
.fromDataSet(input, "a, b, c")
.groupBy("a, b")
// must fail. Field c is not a grouping key or aggregation
.select("c");
}

@Test
public void testGroupedAggregate() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("b, a.sum");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
compareResultAsText(results, expected);
}

@Test
public void testGroupingKeyForwardIfNotUsed() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("a.sum");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
compareResultAsText(results, expected);
}

@Test
public void testGroupNoAggregation() throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
List<Row> results = ds.collect();
compareResultAsText(results, expected);
}

@Test
public void testPojoAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
Expand All @@ -208,5 +291,90 @@ public String map(WC value) throws Exception {
String expected = "Hello\n" + "Hola";
compareResultAsText(result, expected);
}

@Test
public void testPojoGrouping() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<String, Double, String>> data = env.fromElements(
new Tuple3<>("A", 23.0, "Z"),
new Tuple3<>("A", 24.0, "Y"),
new Tuple3<>("B", 1.0, "Z"));

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

Table table = tableEnv
.fromDataSet(data, "groupMe, value, name")
.select("groupMe, value, name")
.where("groupMe != 'B'");

DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);

DataSet<MyPojo> result = myPojos.groupBy("groupMe")
.sortGroup("value", Order.DESCENDING)
.first(1);

List<MyPojo> resultList = result.collect();
compareResultAsText(resultList, "A,24.0,Y");
}

@Test
public void testDistinct() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table = tableEnv.fromDataSet(input, "a, b, c");

Table distinct = table.select("b").distinct();

DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
List<Row> results = ds.collect();
String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
compareResultAsText(results, expected);
}

@Test
public void testDistinctAfterAggregate() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);

Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");

Table distinct = table.groupBy("a, e").select("e").distinct();

DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
List<Row> results = ds.collect();
String expected = "1\n" + "2\n" + "3\n";
compareResultAsText(results, expected);
}

// --------------------------------------------------------------------------------------------

public static class MyPojo implements Serializable {
private static final long serialVersionUID = 8741918940120107213L;

public String groupMe;
public double value;
public String name;

public MyPojo() {
// for serialization
}

public MyPojo(String groupMe, double value, String name) {
this.groupMe = groupMe;
this.value = value;
this.name = name;
}

@Override
public String toString() {
return groupMe + "," + value + "," + name;
}
}
}

Loading

0 comments on commit 7758571

Please sign in to comment.