Skip to content

Commit

Permalink
[FLINK-29152][hive] Fix inconsistent behavior with Hive for `desc tab…
Browse files Browse the repository at this point in the history
…le` in Hive dialect

This closes apache#20789
  • Loading branch information
luoyuxia authored Sep 21, 2022
1 parent 7ddf059 commit b5cd9f3
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerRele
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.getTransactionCoordinatorId()> calls method <org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.getTransactionCoordinatorId()> in (FlinkKafkaProducer.java:1327)
Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:75)
Method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.isIdle()> calls method <org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.isDefaultActionAvailable()> in (MailboxExecutorImpl.java:63)
Method <org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer.getTable(org.apache.flink.table.catalog.ObjectPath)> calls method <org.apache.flink.table.catalog.hive.HiveCatalog.getHiveTable(org.apache.flink.table.catalog.ObjectPath)> in (HiveParserDDLSemanticAnalyzer.java:273)
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
}
}

@VisibleForTesting
@Internal
public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,31 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.HiveSetOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
import org.apache.flink.table.planner.delegation.hive.operations.HiveLoadDataOperation;
import org.apache.flink.table.planner.delegation.hive.operations.HiveShowCreateTableOperation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -76,6 +83,8 @@ public Optional<TableResultInternal> executeOperation(Operation operation) {
return executeHiveLoadDataOperation((HiveLoadDataOperation) operation);
} else if (operation instanceof HiveShowCreateTableOperation) {
return executeShowCreateTableOperation((HiveShowCreateTableOperation) operation);
} else if (operation instanceof DescribeTableOperation) {
return executeDescribeTableOperation((DescribeTableOperation) operation);
} else if (operation instanceof ExplainOperation) {
ExplainOperation explainOperation = (ExplainOperation) operation;
if (explainOperation.getChild() instanceof HiveLoadDataOperation) {
Expand Down Expand Up @@ -264,4 +273,77 @@ private Optional<TableResultInternal> executeShowCreateTableOperation(
.build();
return Optional.of(resultInternal);
}

private Optional<TableResultInternal> executeDescribeTableOperation(
DescribeTableOperation describeTableOperation) {
// currently, if it's 'describe extended', we still delegate to Flink's own implementation
if (describeTableOperation.isExtended()) {
return Optional.empty();
} else {
ObjectIdentifier tableIdentifier = describeTableOperation.getSqlIdentifier();
Catalog currentCatalog =
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
if (!(currentCatalog instanceof HiveCatalog)) {
// delegate to Flink's own implementation
return Optional.empty();
}
HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog;
ObjectPath tablePath =
new ObjectPath(
tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName());
org.apache.hadoop.hive.metastore.api.Table table;
try {
table = hiveCatalog.getHiveTable(tablePath);
} catch (TableNotExistException e) {
throw new FlinkHiveException(
String.format(
"The table or view %s doesn't exist in catalog %s.",
tablePath, catalogManager.getCurrentCatalog()),
e);
}
if (!HiveCatalog.isHiveTable(table.getParameters())) {
// if it's not a Hive table, delegate to Flink's own implementation
return Optional.empty();
}
List<Row> result = new ArrayList<>();
// describe table's columns
List<FieldSchema> columns = table.getSd().getCols();
List<FieldSchema> partitionColumns = table.getPartitionKeys();
for (FieldSchema fieldSchema : columns) {
result.add(describeColumn(fieldSchema));
}
for (FieldSchema fieldSchema : partitionColumns) {
result.add(describeColumn(fieldSchema));
}

// table's partition information
if (!partitionColumns.isEmpty()) {
result.add(Row.of("# Partition Information", "", ""));
for (FieldSchema fieldSchema : partitionColumns) {
result.add(describeColumn(fieldSchema));
}
}
TableResultInternal tableResultInternal =
TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.schema(
ResolvedSchema.physical(
new String[] {"col_name", "data_type", "comment"},
new DataType[] {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
}))
.data(result)
.build();
return Optional.of(tableResultInternal);
}
}

private Row describeColumn(FieldSchema fieldSchema) {
return Row.of(
fieldSchema.getName(),
fieldSchema.getType(),
fieldSchema.getComment() == null ? StringUtils.EMPTY : fieldSchema.getComment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,35 @@ public void testShowCreateTable() throws Exception {
assertThat(actualResult).isEqualTo(expectedResult);
}

@Test
public void testDescribeTable() {
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql(
"create table t1(id BIGINT,\n"
+ " name STRING) WITH (\n"
+ " 'connector' = 'datagen' "
+ ")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("create table t2(a int, b string, c boolean)");
tableEnv.executeSql(
"create table t3(a decimal(10, 2), b double, c float) partitioned by (d date)");

// desc non-hive table
List<Row> result = CollectionUtil.iteratorToList(tableEnv.executeSql("desc t1").collect());
assertThat(result.toString())
.isEqualTo(
"[+I[id, BIGINT, true, null, null, null], +I[name, STRING, true, null, null, null]]");
// desc hive table
result = CollectionUtil.iteratorToList(tableEnv.executeSql("desc t2").collect());
assertThat(result.toString())
.isEqualTo("[+I[a, int, ], +I[b, string, ], +I[c, boolean, ]]");
result = CollectionUtil.iteratorToList(tableEnv.executeSql("desc default.t3").collect());
assertThat(result.toString())
.isEqualTo(
"[+I[a, decimal(10,2), ], +I[b, double, ], +I[c, float, ], +I[d, date, ],"
+ " +I[# Partition Information, , ], +I[d, date, ]]");
}

@Test
public void testUnsupportedOperation() {
List<String> statements =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ show current database;
# test hive table with parameterized types
# ==========================================================================

describe hive.additional_test_database.param_types_table;
describe additional_test_database.param_types_table;
!output
+------+-----------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+-----------------+------+-----+--------+-----------+
| dec | DECIMAL(10, 10) | TRUE | | | |
| ch | CHAR(5) | TRUE | | | |
| vch | VARCHAR(15) | TRUE | | | |
+------+-----------------+------+-----+--------+-----------+
+----------+----------------+---------+
| col_name | data_type | comment |
+----------+----------------+---------+
| dec | decimal(10,10) | |
| ch | char(5) | |
| vch | varchar(15) | |
+----------+----------------+---------+
3 rows in set
!ok

Expand Down

0 comments on commit b5cd9f3

Please sign in to comment.