Skip to content

Commit

Permalink
[FLINK-18529][hive] Query Hive table and filter by timestamp partitio…
Browse files Browse the repository at this point in the history
…n can fail

This closes apache#12856
  • Loading branch information
lirui-apache authored Jul 14, 2020
1 parent d677f02 commit 81f41a6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -453,22 +454,16 @@ public String visit(ValueLiteralExpression valueLiteral) {
if (value == null) {
return "null";
}
LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
if (typeRoot.getFamilies().contains(LogicalTypeFamily.DATETIME)) {
// hive not support partition filter push down with these types.
return null;
}
value = HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType), dataType.getLogicalType(), hiveShim)
.toHiveObject(value);
String res = value.toString();
LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
switch (typeRoot) {
case CHAR:
case VARCHAR:
res = "'" + res.replace("'", "''") + "'";
break;
case DATE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// hive not support partition filter push down with these types.
return null;
default:
break;
if (typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR) {
res = "'" + res.replace("'", "''") + "'";
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ public void testPartitionFilter() throws Exception {
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
results = Lists.newArrayList(query.execute().collect());
assertEquals("[4]", results.toString());

query = tableEnv.sqlQuery("select x from db1.part where '' = p2");
explain = query.explain().split("==.*==\n");
assertFalse(catalog.fallback);
optimizedPlan = explain[2];
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
results = Lists.newArrayList(query.execute().collect());
assertEquals("[]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}
Expand Down Expand Up @@ -319,7 +327,11 @@ public void testPartitionFilterDateTimestamp() throws Exception {
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
List<Row> results = Lists.newArrayList(query.execute().collect());
assertEquals("[3]", results.toString());
System.out.println(results);

// filter by timestamp partition
query = tableEnv.sqlQuery("select x from db1.part where timestamp '2018-08-08 08:08:09' = p2");
results = Lists.newArrayList(query.execute().collect());
assertEquals("[2]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}
Expand Down

0 comments on commit 81f41a6

Please sign in to comment.