Skip to content

Commit

Permalink
[FLINK-22884][hive] HiveCatalog should mark views as generic and stor…
Browse files Browse the repository at this point in the history
…e schema in properties

This closes apache#16149
  • Loading branch information
lirui-apache committed Jul 6, 2021
1 parent 92d9a46 commit 0b68682
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,6 @@ public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
boolean isHiveTable;
if (table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
// check is_generic to be backward compatible
isHiveTable =
!Boolean.parseBoolean(
table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
import static org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.table.catalog.CatalogPropertiesUtil.IS_GENERIC;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Utils to for Hive-backed table. */
Expand Down Expand Up @@ -340,6 +343,7 @@ public static Table alterTableViaCatalogBaseTable(

public static Table instantiateHiveTable(
ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
final boolean isView = table instanceof CatalogView;
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
Expand All @@ -358,7 +362,10 @@ public static Table instantiateHiveTable(
StorageDescriptor sd = hiveTable.getSd();
HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);

if (isHiveTable) {
// We always store schema as properties for view, because view schema may not be mapped to
// hive schema. This also means views created by flink cannot be used in hive, which is fine
// because hive cannot understand the expanded query anyway
if (isHiveTable && !isView) {
HiveTableUtil.initiateTableFromProperties(hiveTable, properties, hiveConf);
List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
// Table columns and partition keys
Expand Down Expand Up @@ -394,10 +401,19 @@ public static Table instantiateHiveTable(

properties.putAll(tableSchemaProps.asMap());
properties = maskFlinkProperties(properties);
// we may need to explicitly set is_generic flag in the following cases:
// 1. user doesn't specify 'connector' or 'connector.type' when creating a table, w/o
// 'is_generic', such a table will be considered as a hive table upon retrieval
// 2. when creating views which don't have connector properties
if (isView
|| (!properties.containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR.key())
&& !properties.containsKey(FLINK_PROPERTY_PREFIX + CONNECTOR_TYPE))) {
properties.put(IS_GENERIC, "true");
}
hiveTable.setParameters(properties);
}

if (table instanceof CatalogView) {
if (isView) {
// TODO: [FLINK-12398] Support partitioned view in catalog API
hiveTable.setPartitionKeys(new ArrayList<>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ private Operation convertCreateAlterView(HiveParserCreateViewDesc desc) {
props.putAll(baseTable.getOptions());
comment = baseTable.getComment();
} else {
markHiveConnector(props);
comment = desc.getComment();
if (desc.getTblProps() != null) {
props.putAll(desc.getTblProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
Expand Down Expand Up @@ -559,20 +561,20 @@ public void testView() throws Exception {
tableEnv.executeSql(
"create view v(vx) comment 'v comment' tblproperties ('k1'='v1') as select x from tbl");
ObjectPath viewPath = new ObjectPath("default", "v");
Table hiveView = hiveCatalog.getHiveTable(viewPath);
assertEquals(TableType.VIRTUAL_VIEW.name(), hiveView.getTableType());
assertEquals("vx", hiveView.getSd().getCols().get(0).getName());
assertEquals("v1", hiveView.getParameters().get("k1"));
CatalogBaseTable catalogBaseTable = hiveCatalog.getTable(viewPath);
assertTrue(catalogBaseTable instanceof CatalogView);
assertEquals("vx", catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());
assertEquals("v1", catalogBaseTable.getOptions().get("k1"));

// change properties
tableEnv.executeSql("alter view v set tblproperties ('k1'='v11')");
hiveView = hiveCatalog.getHiveTable(viewPath);
assertEquals("v11", hiveView.getParameters().get("k1"));
catalogBaseTable = hiveCatalog.getTable(viewPath);
assertEquals("v11", catalogBaseTable.getOptions().get("k1"));

// change query
tableEnv.executeSql("alter view v as select y from tbl");
hiveView = hiveCatalog.getHiveTable(viewPath);
assertEquals("y", hiveView.getSd().getCols().get(0).getName());
catalogBaseTable = hiveCatalog.getTable(viewPath);
assertEquals("y", catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());

// rename
tableEnv.executeSql("alter view v rename to v1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
Expand All @@ -40,6 +41,7 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -317,6 +319,23 @@ public void testFunctionCompatibility() throws Exception {
assertEquals(FunctionLanguage.JAVA, catalogFunction.getFunctionLanguage());
}

@Test
public void testGenericTableWithoutConnectorProp() throws Exception {
catalog.createDatabase(db1, createDb(), false);
TableSchema tableSchema =
TableSchema.builder()
.fields(
new String[] {"s", "ts"},
new DataType[] {DataTypes.STRING(), DataTypes.TIMESTAMP_LTZ(3)})
.watermark("ts", "ts-INTERVAL '1' SECOND", DataTypes.TIMESTAMP_LTZ(3))
.build();
CatalogTable catalogTable = new CatalogTableImpl(tableSchema, Collections.emptyMap(), null);
catalog.createTable(path1, catalogTable, false);
CatalogTable retrievedTable = (CatalogTable) catalog.getTable(path1);
assertEquals(tableSchema, retrievedTable.getSchema());
assertEquals(Collections.emptyMap(), retrievedTable.getOptions());
}

// ------ functions ------

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestUtil;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
Expand All @@ -43,21 +48,28 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.udf.UDFRand;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.IDENTIFIER;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -94,6 +106,67 @@ public void testCreateTable_StorageFormatSet() throws Exception {
}

// ------ table and column stats ------

@Test
public void testViewCompatibility() throws Exception {
// we always store view schema via properties now
// make sure non-generic views created previously can still be used
catalog.createDatabase(db1, createDb(), false);
Table hiveView =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
path1.getDatabaseName(), path1.getObjectName());
// mark as a view
hiveView.setTableType(TableType.VIRTUAL_VIEW.name());
final String originQuery = "view origin query";
final String expandedQuery = "view expanded query";
hiveView.setViewOriginalText(originQuery);
hiveView.setViewExpandedText(expandedQuery);
// set schema in SD
Schema schema =
Schema.newBuilder()
.fromFields(
new String[] {"i", "s"},
new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()})
.build();
List<FieldSchema> fields = new ArrayList<>();
for (Schema.UnresolvedColumn column : schema.getColumns()) {
String name = column.getName();
DataType type = (DataType) ((Schema.UnresolvedPhysicalColumn) column).getDataType();
fields.add(
new FieldSchema(
name, HiveTypeUtil.toHiveTypeInfo(type, true).getTypeName(), null));
}
hiveView.getSd().setCols(fields);
// test mark as non-generic with is_generic
hiveView.getParameters().put(CatalogPropertiesUtil.IS_GENERIC, "false");
// add some other properties
hiveView.getParameters().put("k1", "v1");

((HiveCatalog) catalog).client.createTable(hiveView);
CatalogBaseTable baseTable = catalog.getTable(path1);
assertTrue(baseTable instanceof CatalogView);
CatalogView catalogView = (CatalogView) baseTable;
assertEquals(schema, catalogView.getUnresolvedSchema());
assertEquals(originQuery, catalogView.getOriginalQuery());
assertEquals(expandedQuery, catalogView.getExpandedQuery());
assertEquals("v1", catalogView.getOptions().get("k1"));

// test mark as non-generic with connector
hiveView.setDbName(path3.getDatabaseName());
hiveView.setTableName(path3.getObjectName());
hiveView.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC);
hiveView.getParameters().put(CONNECTOR.key(), IDENTIFIER);

((HiveCatalog) catalog).client.createTable(hiveView);
baseTable = catalog.getTable(path3);
assertTrue(baseTable instanceof CatalogView);
catalogView = (CatalogView) baseTable;
assertEquals(schema, catalogView.getUnresolvedSchema());
assertEquals(originQuery, catalogView.getOriginalQuery());
assertEquals(expandedQuery, catalogView.getExpandedQuery());
assertEquals("v1", catalogView.getOptions().get("k1"));
}

@Test
public void testAlterTableColumnStatistics() throws Exception {
String hiveVersion = ((HiveCatalog) catalog).getHiveVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -31,13 +33,15 @@
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -497,4 +501,58 @@ public void testCreateTableLike() throws Exception {
assertEquals("x", catalogTable.getSchema().getFieldNames()[0]);
assertEquals(DataTypes.INT(), catalogTable.getSchema().getFieldDataTypes()[0]);
}

@Test
public void testViewSchema() throws Exception {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());

tableEnv.executeSql("create database db1");
try {
tableEnv.useDatabase("db1");
tableEnv.executeSql(
"create table src(x int,ts timestamp(3)) with ('connector'='datagen','number-of-rows'='10')");
tableEnv.executeSql("create view v1 as select x,ts from src order by x limit 3");

CatalogView catalogView =
(CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v1"));
Schema viewSchema = catalogView.getUnresolvedSchema();
assertEquals(
Schema.newBuilder()
.fromFields(
new String[] {"x", "ts"},
new AbstractDataType[] {
DataTypes.INT(), DataTypes.TIMESTAMP(3)
})
.build(),
viewSchema);

List<Row> results =
CollectionUtil.iteratorToList(
tableEnv.executeSql("select x from v1").collect());
assertEquals(3, results.size());

tableEnv.executeSql(
"create view v2 (v2_x,v2_ts) comment 'v2 comment' as select x,cast(ts as timestamp_ltz(3)) from v1");
catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", "v2"));
assertEquals(
Schema.newBuilder()
.fromFields(
new String[] {"v2_x", "v2_ts"},
new AbstractDataType[] {
DataTypes.INT(), DataTypes.TIMESTAMP_LTZ(3)
})
.build(),
catalogView.getUnresolvedSchema());
assertEquals("v2 comment", catalogView.getComment());
results =
CollectionUtil.iteratorToList(
tableEnv.executeSql("select * from v2").collect());
assertEquals(3, results.size());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public CatalogView createView() {
String.format("select * from %s", t1),
String.format(
"select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
getBatchTableProperties());
Collections.emptyMap());
return new ResolvedCatalogView(origin, resolvedSchema);
}

Expand All @@ -139,7 +139,7 @@ public CatalogView createAnotherView() {
String.format("select * from %s", t2),
String.format(
"select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()),
getBatchTableProperties());
Collections.emptyMap());
return new ResolvedCatalogView(origin, resolvedSchema);
}

Expand Down

0 comments on commit 0b68682

Please sign in to comment.