Skip to content

Commit

Permalink
[tableAPI] Cleaned up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fhueske authored and vasia committed Mar 18, 2016
1 parent b9f1ff0 commit 29fb3d2
Show file tree
Hide file tree
Showing 22 changed files with 215 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, Unresol
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._

import scala.collection.mutable
import scala.collection.JavaConverters._

case class BaseTable(
Expand Down Expand Up @@ -84,6 +85,8 @@ class Table(
*/
def select(fields: Expression*): Table = {

checkUniqueNames(fields)

relBuilder.push(relNode)

// separate aggregations and selection expressions
Expand Down Expand Up @@ -394,10 +397,31 @@ class Table(
}

}

LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava)
}

private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
val names: mutable.Set[String] = mutable.Set()

exprs.foreach {
case n: Naming =>
// explicit name
if (names.contains(n.name)) {
throw new IllegalArgumentException(s"Duplicate field name $n.name.")
} else {
names.add(n.name)
}
case u: UnresolvedFieldReference =>
// simple field forwarding
if (names.contains(u.name)) {
throw new IllegalArgumentException(s"Duplicate field name $u.name.")
} else {
names.add(u.name)
}
case _ => // Do nothing
}
}

}

class GroupedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public void testAggregationWithArithmetic() throws Exception {
Table result =
table.select("(f0 + 2).avg + 2, f1.count + 5");


DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "5.5,7";
Expand Down Expand Up @@ -172,6 +171,7 @@ public void testNonWorkingDataTypes() throws Exception {
tableEnv.fromDataSet(input);

Table result =
// Must fail. Cannot compute SUM aggregate on String field.
table.select("f1.sum");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
Expand All @@ -191,6 +191,7 @@ public void testNoNestedAggregation() throws Exception {
tableEnv.fromDataSet(input);

Table result =
// Must fail. Aggregation on aggregation not allowed.
table.select("f0.sum.sum");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

// Must fail. Not enough field names specified.
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
}

Expand All @@ -232,6 +233,7 @@ public void testAsWithToManyFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

// Must fail. Too many field names specified.
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
}

Expand All @@ -240,6 +242,7 @@ public void testAsWithAmbiguousFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

// Must fail. Specified field names are not unique.
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
}

Expand All @@ -248,6 +251,7 @@ public void testAsWithNonFieldReference1() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

// Must fail. as() does only allow field name expressions
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
}

Expand All @@ -256,11 +260,13 @@ public void testAsWithNonFieldReference2() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

// Must fail. as() does only allow field name expressions
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
}

// --------------------------------------------------------------------------------------------

@SuppressWarnings("unused")
public static class SmallPojo {

public SmallPojo() { }
Expand All @@ -278,6 +284,7 @@ public SmallPojo(String name, int age, double salary, String department) {
public String department;
}

@SuppressWarnings("unused")
public static class PrivateSmallPojo {

public PrivateSmallPojo() { }
Expand Down Expand Up @@ -327,6 +334,7 @@ public void setDepartment(String department) {
}
}

@SuppressWarnings("unused")
public static class SmallPojo2 {

public SmallPojo2() { }
Expand All @@ -349,6 +357,7 @@ public String toString() {
}
}

@SuppressWarnings("unused")
public static class PrivateSmallPojo2 {

public PrivateSmallPojo2() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.api.table.Table;
import org.apache.flink.api.table.codegen.CodeGenException;
import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -138,7 +139,8 @@ public void testCastFromString() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = CodeGenException.class)
@Ignore // Date type not supported yet
@Test
public void testCastDateFromString() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
Expand All @@ -160,7 +162,8 @@ public void testCastDateFromString() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = CodeGenException.class)
@Ignore // Date type not supported yet
@Test
public void testCastDateToStringAndLong() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.table.codegen.CodeGenException;
import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.junit.Test;
Expand All @@ -45,9 +46,7 @@ public void testAllRejectingFilter() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.filter("false");
Expand All @@ -64,9 +63,7 @@ public void testAllPassingFilter() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.filter("true");
Expand All @@ -89,9 +86,7 @@ public void testFilterOnIntegerTupleField() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.filter(" a % 2 = 0 ");
Expand All @@ -110,9 +105,7 @@ public void testNotEquals() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.filter("!( a % 2 <> 0 ) ");
Expand All @@ -131,9 +124,10 @@ public void testDisjunctivePreds() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table = tableEnv.fromDataSet(input, "a, b, c");
Table result = table.filter("a < 2 || a > 20");

Table result = table
.filter("a < 2 || a > 20");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
Expand All @@ -147,15 +141,29 @@ public void testIntegerBiggerThan128() throws Exception {
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));

Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table.filter("a = 300 ");
Table result = table
.filter("a = 300 ");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "300,1,Hello\n";
compareResultAsText(results, expected);
}

@Test(expected = IllegalArgumentException.class)
public void testFilterInvalidField() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
Table table = tableEnv.fromDataSet(input, "a, b, c");

table
// Must fail. Field foo does not exist.
.filter("foo = 17");
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ public void testGroupedAggregate() throws Exception {
TableEnvironment tableEnv = new TableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("b, a.sum");
Expand All @@ -88,17 +86,11 @@ public void testGroupedAggregate() throws Exception {

@Test
public void testGroupingKeyForwardIfNotUsed() throws Exception {

// the grouping key needs to be forwarded to the intermediate DataSet, even
// if we don't want the key in the output

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("a.sum");
Expand All @@ -116,9 +108,7 @@ public void testGroupNoAggregation() throws Exception {
TableEnvironment tableEnv = new TableEnvironment();

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

Table table =
tableEnv.fromDataSet(input, "a, b, c");
Table table = tableEnv.fromDataSet(input, "a, b, c");

Table result = table
.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,8 @@ public void testJoinNonExistingKey() throws Exception {
Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");

Table result = in1.join(in2).where("foo === e").select("c, g");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "";
compareResultAsText(results, expected);
// Must fail. Field foo does not exist.
in1.join(in2).where("foo === e").select("c, g");
}

@Test(expected = InvalidProgramException.class)
Expand All @@ -150,13 +146,11 @@ public void testJoinWithNonMatchingKeyTypes() throws Exception {
Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");

Table result = in1
.join(in2).where("a === g").select("c, g");
Table result = in1.join(in2)
// Must fail. Types of join fields are not compatible (Integer and String)
.where("a === g").select("c, g");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "";
compareResultAsText(results, expected);
tableEnv.toDataSet(result, Row.class).collect();
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -170,13 +164,8 @@ public void testJoinWithAmbiguousFields() throws Exception {
Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");

Table result = in1
.join(in2).where("a === d").select("c, g");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected = "";
compareResultAsText(results, expected);
// Must fail. Join input have overlapping field names.
in1.join(in2).where("a === d").select("c, g");
}

@Test
Expand Down
Loading

0 comments on commit 29fb3d2

Please sign in to comment.