Skip to content

Commit

Permalink
[FLINK-17252][table] Add Table#execute api and support SELECT stateme…
Browse files Browse the repository at this point in the history
…nt in TableEnvironment#executeSql

This closes apache#12049
  • Loading branch information
godfreyhe authored and KurtYoung committed May 12, 2020
1 parent 7cfcd33 commit 2160c32
Show file tree
Hide file tree
Showing 44 changed files with 1,424 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand All @@ -48,6 +47,8 @@
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testReadNonPartitionedTable() throws Exception {
TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test");
List<Row> rows = TableUtils.collectToList(src);
List<Row> rows = Lists.newArrayList(src.execute().collect());

Assert.assertEquals(4, rows.size());
Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
Expand All @@ -154,7 +155,7 @@ public void testReadComplexDataType() throws Exception {
TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test");
List<Row> rows = TableUtils.collectToList(src);
List<Row> rows = Lists.newArrayList(src.execute().collect());
Assert.assertEquals(1, rows.size());
assertArrayEquals(array, (Integer[]) rows.get(0).getField(0));
assertEquals(map, rows.get(0).getField(1));
Expand Down Expand Up @@ -183,7 +184,7 @@ public void testReadPartitionTable() throws Exception {
TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
List<Row> rows = TableUtils.collectToList(src);
List<Row> rows = Lists.newArrayList(src.execute().collect());

assertEquals(4, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testPartitionPrunning() throws Exception {
assertTrue(physicalExecutionPlan, physicalExecutionPlan.contains(
"HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1"));
// second check execute results
List<Row> rows = TableUtils.collectToList(src);
List<Row> rows = Lists.newArrayList(src.execute().collect());
assertEquals(2, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
Expand Down Expand Up @@ -248,39 +249,39 @@ public void testPartitionFilter() throws Exception {
assertFalse(catalog.fallback);
String optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 3"));
List<Row> results = TableUtils.collectToList(query);
List<Row> results = Lists.newArrayList(query.execute().collect());
assertEquals("[2, 3, 4]", results.toString());

query = tableEnv.sqlQuery("select x from db1.part where p1>2 and p2<='a' order by x");
explain = tableEnv.explain(query).split("==.*==\n");
assertFalse(catalog.fallback);
optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
results = TableUtils.collectToList(query);
results = Lists.newArrayList(query.execute().collect());
assertEquals("[]", results.toString());

query = tableEnv.sqlQuery("select x from db1.part where p1 in (1,3,5) order by x");
explain = tableEnv.explain(query).split("==.*==\n");
assertFalse(catalog.fallback);
optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 2"));
results = TableUtils.collectToList(query);
results = Lists.newArrayList(query.execute().collect());
assertEquals("[1, 3]", results.toString());

query = tableEnv.sqlQuery("select x from db1.part where (p1=1 and p2='a') or ((p1=2 and p2='b') or p2='d') order by x");
explain = tableEnv.explain(query).split("==.*==\n");
assertFalse(catalog.fallback);
optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 2"));
results = TableUtils.collectToList(query);
results = Lists.newArrayList(query.execute().collect());
assertEquals("[1, 2]", results.toString());

query = tableEnv.sqlQuery("select x from db1.part where p2 = 'c:2' order by x");
explain = tableEnv.explain(query).split("==.*==\n");
assertFalse(catalog.fallback);
optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
results = TableUtils.collectToList(query);
results = Lists.newArrayList(query.execute().collect());
assertEquals("[4]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
Expand Down Expand Up @@ -311,7 +312,7 @@ public void testPartitionFilterDateTimestamp() throws Exception {
assertTrue(catalog.fallback);
String optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
List<Row> results = TableUtils.collectToList(query);
List<Row> results = Lists.newArrayList(query.execute().collect());
assertEquals("[3]", results.toString());
System.out.println(results);
} finally {
Expand Down Expand Up @@ -343,7 +344,7 @@ public void testProjectionPushDown() throws Exception {
assertTrue(logicalPlan, logicalPlan.contains(expectedExplain));
assertTrue(physicalPlan, physicalPlan.contains(expectedExplain));

List<Row> rows = TableUtils.collectToList(table);
List<Row> rows = Lists.newArrayList(table.execute().collect());
assertEquals(2, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2013,2", "2014,1"}, rowStrings);
Expand Down Expand Up @@ -377,7 +378,7 @@ public void testLimitPushDown() throws Exception {
assertTrue(logicalPlan.contains(expectedExplain));
assertTrue(physicalPlan.contains(expectedExplain));

List<Row> rows = TableUtils.collectToList(table);
List<Row> rows = Lists.newArrayList(table.execute().collect());
assertEquals(1, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"a"}, rowStrings);
Expand Down Expand Up @@ -487,7 +488,8 @@ private void testSourceConfig(boolean fallbackMR, boolean inferParallelism) thro
tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
tableEnv.useCatalog(catalogSpy.getName());

List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by x"));
List<Row> results = Lists.newArrayList(
tableEnv.sqlQuery("select * from db1.src order by x").execute().collect());
assertEquals("[1,a, 2,b]", results.toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand All @@ -34,6 +33,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.ArrayUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void testDefaultPartitionName() throws Exception {
assertTrue(fs.exists(defaultPartPath));

TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery("select y, x from db1.part order by x");
List<Row> rows = TableUtils.collectToList(flinkTable);
List<Row> rows = Lists.newArrayList(flinkTable.execute().collect());
assertEquals(Arrays.toString(new String[]{"1,1", "null,2"}), rows.toString());

hiveShell.execute("drop database db1 cascade");
Expand Down Expand Up @@ -334,15 +335,15 @@ public void testDateTimestampPartitionColumns() throws Exception {
.addRow(new Object[]{3})
.commit("dt='2019-12-25',ts='2019-12-25 16:23:43.012'");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.part order by x"));
List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.part order by x").execute().collect());
assertEquals("[1,2019-12-23,2019-12-23T00:00, 2,2019-12-23,2019-12-23T00:00, 3,2019-12-25,2019-12-25T16:23:43.012]", results.toString());

results = TableUtils.collectToList(tableEnv.sqlQuery("select x from db1.part where dt=cast('2019-12-25' as date)"));
results = Lists.newArrayList(tableEnv.sqlQuery("select x from db1.part where dt=cast('2019-12-25' as date)").execute().collect());
assertEquals("[3]", results.toString());

tableEnv.sqlUpdate("insert into db1.part select 4,cast('2019-12-31' as date),cast('2019-12-31 12:00:00.0' as timestamp)");
tableEnv.execute("insert");
results = TableUtils.collectToList(tableEnv.sqlQuery("select max(dt) from db1.part"));
results = Lists.newArrayList(tableEnv.sqlQuery("select max(dt) from db1.part").execute().collect());
assertEquals("[2019-12-31]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
Expand Down Expand Up @@ -372,19 +373,19 @@ public void testUDTF() throws Exception {
hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit();

TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
List<Row> results = TableUtils.collectToList(
tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)"));
List<Row> results = Lists.newArrayList(
tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)").execute().collect());
assertEquals("[1, 2, 3]", results.toString());
results = TableUtils.collectToList(
tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)"));
results = Lists.newArrayList(
tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)").execute().collect());
assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());

hiveShell.execute("create table db1.ts (a array<timestamp>)");
HiveTestUtils.createTextTableInserter(hiveShell, "db1", "ts").addRow(new Object[]{
new Object[]{Timestamp.valueOf("2015-04-28 15:23:00"), Timestamp.valueOf("2016-06-03 17:05:52")}})
.commit();
results = TableUtils.collectToList(
tableEnv.sqlQuery("select x from db1.ts, lateral table(hiveudtf(a)) as T(x)"));
results = Lists.newArrayList(
tableEnv.sqlQuery("select x from db1.ts, lateral table(hiveudtf(a)) as T(x)").execute().collect());
assertEquals("[2015-04-28T15:23, 2016-06-03T17:05:52]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
Expand Down Expand Up @@ -455,7 +456,7 @@ public void testTimestamp() throws Exception {
.commit();
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
// test read timestamp from hive
List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src"));
List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect());
assertEquals(2, results.size());
assertEquals(LocalDateTime.of(2019, 11, 11, 0, 0), results.get(0).getField(0));
assertEquals(LocalDateTime.of(2019, 12, 3, 15, 43, 32, 123456789), results.get(1).getField(0));
Expand All @@ -480,7 +481,7 @@ public void testDate() throws Exception {
.commit();
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
// test read date from hive
List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src"));
List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect());
assertEquals(2, results.size());
assertEquals(LocalDate.of(2019, 12, 9), results.get(0).getField(0));
assertEquals(LocalDate.of(2019, 12, 12), results.get(1).getField(0));
Expand Down Expand Up @@ -517,11 +518,11 @@ public void testViews() throws Exception {
hiveShell.execute("create view db1.v2 as select key,count(*) from db1.src group by key having count(*)>1 order by key");
hiveShell.execute("create view db1.v3 as select k.key,k.name,count(*) from db1.src s join db1.keys k on s.key=k.key group by k.key,k.name order by k.key");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select count(v) from db1.v1"));
List<Row> results = Lists.newArrayList(tableEnv.sqlQuery("select count(v) from db1.v1").execute().collect());
assertEquals("[2]", results.toString());
results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.v2"));
results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v2").execute().collect());
assertEquals("[1,3, 3,2]", results.toString());
results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.v3"));
results = Lists.newArrayList(tableEnv.sqlQuery("select * from db1.v3").execute().collect());
assertEquals("[1,key1,3, 2,key2,1, 3,key3,2]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
Expand Down Expand Up @@ -577,7 +578,7 @@ public void testRegexSerDe() throws Exception {
.addRow(new Object[]{2, "ab"})
.commit();
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
assertEquals("[1,a, 2,ab]", TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by x")).toString());
assertEquals("[1,a, 2,ab]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src order by x").execute().collect()).toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
Expand All @@ -594,7 +595,7 @@ public void testUpdatePartitionSD() throws Exception {
hiveShell.execute("alter table db1.dest set fileformat sequencefile");
tableEnv.sqlUpdate("insert overwrite db1.dest partition (p='1') select 1");
tableEnv.execute(null);
assertEquals("[1,1]", TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.dest")).toString());
assertEquals("[1,1]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.dest").execute().collect()).toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
Expand All @@ -611,8 +612,8 @@ public void testParquetNameMapping() throws Exception {
hiveShell.execute(String.format("create table db1.t2 (y int,x int) stored as parquet location '%s'", location));
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
assertEquals("[1, 2]", TableUtils.collectToList(tableEnv.sqlQuery("select x from db1.t1")).toString());
assertEquals("[1, 2]", TableUtils.collectToList(tableEnv.sqlQuery("select x from db1.t2")).toString());
assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t1").execute().collect()).toString());
assertEquals("[1, 2]", Lists.newArrayList(tableEnv.sqlQuery("select x from db1.t2").execute().collect()).toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
Expand All @@ -632,10 +633,10 @@ public void testOrcSchemaEvol() throws Exception {
tableEnv.getConfig().getConfiguration().setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);

hiveShell.execute("alter table db1.src change x x int");
assertEquals("[1,100, 2,200]", TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src")).toString());
assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());

hiveShell.execute("alter table db1.src change y y string");
assertEquals("[1,100, 2,200]", TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src")).toString());
assertEquals("[1,100, 2,200]", Lists.newArrayList(tableEnv.sqlQuery("select * from db1.src").execute().collect()).toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
Expand All @@ -655,7 +656,7 @@ private void verifyHiveQueryResult(String query, List<String> expected) {
}

private void verifyFlinkQueryResult(org.apache.flink.table.api.Table table, List<String> expected) throws Exception {
List<Row> rows = TableUtils.collectToList(table);
List<Row> rows = Lists.newArrayList(table.execute().collect());
List<String> results = rows.stream().map(row ->
IntStream.range(0, row.getArity())
.mapToObj(row::getField)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.TableUtils;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
Expand All @@ -38,6 +37,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testCsvTableViaSQL() throws Exception {

Table t = tableEnv.sqlQuery("SELECT * FROM myhive.`default`.test2");

List<Row> result = TableUtils.collectToList(t);
List<Row> result = Lists.newArrayList(t.execute().collect());

// assert query result
assertEquals(
Expand All @@ -128,7 +129,7 @@ public void testCsvTableViaSQL() throws Exception {

t = tableEnv.sqlQuery("SELECT * FROM myhive.`default`.newtable");

result = TableUtils.collectToList(t);
result = Lists.newArrayList(t.execute().collect());

// assert query result
assertEquals(
Expand Down Expand Up @@ -195,7 +196,7 @@ public void testCsvTableViaAPI() throws Exception {
Table t = tableEnv.sqlQuery(
String.format("select * from myhive.`default`.%s", sourceTableName));

List<Row> result = TableUtils.collectToList(t);
List<Row> result = Lists.newArrayList(t.execute().collect());
result.sort(Comparator.comparing(String::valueOf));

// assert query result
Expand Down
Loading

0 comments on commit 2160c32

Please sign in to comment.