Skip to content

Commit

Permalink
[FLINK-18867][hive] Generic table stored in Hive catalog is incompati…
Browse files Browse the repository at this point in the history
…ble for 1.10

This closes apache#13101
  • Loading branch information
lirui-apache authored Aug 13, 2020
1 parent 7574952 commit 47bf0aa
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public class HiveCatalog extends AbstractCatalog {
private final String hiveVersion;
private final HiveShim hiveShim;

private HiveMetastoreClientWrapper client;
@VisibleForTesting
HiveMetastoreClientWrapper client;

public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir) {
this(catalogName, defaultDatabase, hiveConfDir, HiveShimLoader.getHiveVersion());
Expand Down Expand Up @@ -614,8 +615,10 @@ private CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveC
DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
tableSchemaProps.putProperties(properties);
ObjectPath tablePath = new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName());
// try to get table schema with both new and old (1.10) key, in order to support tables created in old version
tableSchema = tableSchemaProps.getOptionalTableSchema(Schema.SCHEMA)
.orElseThrow(() -> new CatalogException("Failed to get table schema from properties for generic table " + tablePath));
.orElseGet(() -> tableSchemaProps.getOptionalTableSchema("generic.table.schema")
.orElseThrow(() -> new CatalogException("Failed to get table schema from properties for generic table " + tablePath)));
partitionKeys = tableSchemaProps.getPartitionKeys();
// remove the schema from properties
properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Test for HiveCatalog on generic metadata.
Expand Down Expand Up @@ -65,6 +69,147 @@ public void testGenericTableSchema() throws Exception {
}
}

@Test
// NOTE: Be careful to modify this test, it is important to backward compatibility
public void testTableSchemaCompatibility() throws Exception {
catalog.createDatabase(db1, createDb(), false);
try {
// table with numeric types
ObjectPath tablePath = new ObjectPath(db1, "generic1");
Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().putAll(getBatchTableProperties());
hiveTable.getParameters().put("flink.generic.table.schema.0.name", "ti");
hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "TINYINT");
hiveTable.getParameters().put("flink.generic.table.schema.1.name", "si");
hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "SMALLINT");
hiveTable.getParameters().put("flink.generic.table.schema.2.name", "i");
hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "INT");
hiveTable.getParameters().put("flink.generic.table.schema.3.name", "bi");
hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "BIGINT");
hiveTable.getParameters().put("flink.generic.table.schema.4.name", "f");
hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "FLOAT");
hiveTable.getParameters().put("flink.generic.table.schema.5.name", "d");
hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "DOUBLE");
hiveTable.getParameters().put("flink.generic.table.schema.6.name", "de");
hiveTable.getParameters().put("flink.generic.table.schema.6.data-type", "DECIMAL(10, 5)");
hiveTable.getParameters().put("flink.generic.table.schema.7.name", "cost");
hiveTable.getParameters().put("flink.generic.table.schema.7.expr", "`d` * `bi`");
hiveTable.getParameters().put("flink.generic.table.schema.7.data-type", "DOUBLE");
((HiveCatalog) catalog).client.createTable(hiveTable);
CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath);
assertTrue(Boolean.parseBoolean(catalogBaseTable.getOptions().get(CatalogConfig.IS_GENERIC)));
TableSchema expectedSchema = TableSchema.builder()
.fields(new String[]{"ti", "si", "i", "bi", "f", "d", "de"},
new DataType[]{DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(10, 5)})
.field("cost", DataTypes.DOUBLE(), "`d` * `bi`")
.build();
assertEquals(expectedSchema, catalogBaseTable.getSchema());

// table with character types
tablePath = new ObjectPath(db1, "generic2");
hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().putAll(getBatchTableProperties());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().put("flink.generic.table.schema.0.name", "c");
hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "CHAR(265)");
hiveTable.getParameters().put("flink.generic.table.schema.1.name", "vc");
hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "VARCHAR(65536)");
hiveTable.getParameters().put("flink.generic.table.schema.2.name", "s");
hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "VARCHAR(2147483647)");
hiveTable.getParameters().put("flink.generic.table.schema.3.name", "b");
hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "BINARY(1)");
hiveTable.getParameters().put("flink.generic.table.schema.4.name", "vb");
hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "VARBINARY(255)");
hiveTable.getParameters().put("flink.generic.table.schema.5.name", "bs");
hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "VARBINARY(2147483647)");
hiveTable.getParameters().put("flink.generic.table.schema.6.name", "len");
hiveTable.getParameters().put("flink.generic.table.schema.6.expr", "CHAR_LENGTH(`s`)");
hiveTable.getParameters().put("flink.generic.table.schema.6.data-type", "INT");
((HiveCatalog) catalog).client.createTable(hiveTable);
catalogBaseTable = catalog.getTable(tablePath);
expectedSchema = TableSchema.builder()
.fields(new String[]{"c", "vc", "s", "b", "vb", "bs"},
new DataType[]{DataTypes.CHAR(265), DataTypes.VARCHAR(65536), DataTypes.STRING(), DataTypes.BINARY(1),
DataTypes.VARBINARY(255), DataTypes.BYTES()})
.field("len", DataTypes.INT(), "CHAR_LENGTH(`s`)")
.build();
assertEquals(expectedSchema, catalogBaseTable.getSchema());

// table with date/time types
tablePath = new ObjectPath(db1, "generic3");
hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().putAll(getBatchTableProperties());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().put("flink.generic.table.schema.0.name", "dt");
hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "DATE");
hiveTable.getParameters().put("flink.generic.table.schema.1.name", "t");
hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "TIME(0)");
hiveTable.getParameters().put("flink.generic.table.schema.2.name", "ts");
hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "TIMESTAMP(3)");
hiveTable.getParameters().put("flink.generic.table.schema.3.name", "tstz");
hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "TIMESTAMP(6) WITH LOCAL TIME ZONE");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.rowtime", "ts");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.expr", "ts");
((HiveCatalog) catalog).client.createTable(hiveTable);
catalogBaseTable = catalog.getTable(tablePath);
expectedSchema = TableSchema.builder()
.fields(new String[]{"dt", "t", "ts", "tstz"},
new DataType[]{DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()})
.watermark("ts", "ts", DataTypes.TIMESTAMP(3))
.build();
assertEquals(expectedSchema, catalogBaseTable.getSchema());

// table with complex/misc types
tablePath = new ObjectPath(db1, "generic4");
hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().putAll(getBatchTableProperties());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.getParameters().put("flink.generic.table.schema.0.name", "a");
hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "ARRAY<INT>");
hiveTable.getParameters().put("flink.generic.table.schema.1.name", "m");
hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "MAP<BIGINT, TIMESTAMP(6)>");
hiveTable.getParameters().put("flink.generic.table.schema.2.name", "mul");
hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "MULTISET<DOUBLE>");
hiveTable.getParameters().put("flink.generic.table.schema.3.name", "r");
hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "ROW<`f1` INT, `f2` VARCHAR(2147483647)>");
hiveTable.getParameters().put("flink.generic.table.schema.4.name", "b");
hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "BOOLEAN");
hiveTable.getParameters().put("flink.generic.table.schema.5.name", "ts");
hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "TIMESTAMP(3)");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.rowtime", "ts");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.expr", "`ts` - INTERVAL '5' SECOND");
((HiveCatalog) catalog).client.createTable(hiveTable);
catalogBaseTable = catalog.getTable(tablePath);
expectedSchema = TableSchema.builder()
.fields(new String[]{"a", "m", "mul", "r", "b", "ts"},
new DataType[]{DataTypes.ARRAY(DataTypes.INT()),
DataTypes.MAP(DataTypes.BIGINT(), DataTypes.TIMESTAMP()),
DataTypes.MULTISET(DataTypes.DOUBLE()),
DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())),
DataTypes.BOOLEAN(), DataTypes.TIMESTAMP(3)})
.watermark("ts", "`ts` - INTERVAL '5' SECOND", DataTypes.TIMESTAMP(3))
.build();
assertEquals(expectedSchema, catalogBaseTable.getSchema());
} finally {
catalog.dropDatabase(db1, true, true);
}
}

// ------ partitions ------

@Test
Expand Down

0 comments on commit 47bf0aa

Please sign in to comment.