Skip to content

Commit

Permalink
[FLINK-12756][hive] migrate HiveCatalog from TypeInformation-based ol…
Browse files Browse the repository at this point in the history
…d type system to DataType-based new type system

This PR migrates HiveCatalog from TypeInformation-based old type system to DataType-based new type system

This closes apache#8639.
  • Loading branch information
bowenli86 committed Jun 6, 2019
1 parent c9a5af3 commit baaea36
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -246,7 +247,7 @@ public void open(int taskNumber, int numTasks) throws IOException {

List<ObjectInspector> objectInspectors = new ArrayList<>();
for (int i = 0; i < rowTypeInfo.getArity() - partitionCols.size(); i++) {
objectInspectors.add(HiveTableUtil.getObjectInspector(rowTypeInfo.getTypeAt(i)));
objectInspectors.add(HiveTableUtil.getObjectInspector(LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i))));
}

if (!isPartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
Expand All @@ -39,7 +40,7 @@ private HiveTableUtil() {
/**
* Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
*/
public static ObjectInspector getObjectInspector(TypeInformation flinkType) throws IOException {
public static ObjectInspector getObjectInspector(DataType flinkType) throws IOException {
return getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.table.catalog.hive.util;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSc
allCols.addAll(partitionKeys);

String[] colNames = new String[allCols.size()];
TypeInformation[] colTypes = new TypeInformation[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()];

for (int i = 0; i < allCols.size(); i++) {
FieldSchema fs = allCols.get(i);
Expand All @@ -61,21 +61,23 @@ public static TableSchema createTableSchema(List<FieldSchema> cols, List<FieldSc
colTypes[i] = HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
}

return new TableSchema(colNames, colTypes);
return TableSchema.builder()
.fields(colNames, colTypes)
.build();
}

/**
* Create Hive columns from Flink TableSchema.
*/
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
TypeInformation[] fieldTypes = schema.getFieldTypes();
DataType[] fieldTypes = schema.getFieldDataTypes();

List<FieldSchema> columns = new ArrayList<>(fieldNames.length);

for (int i = 0; i < fieldNames.length; i++) {
columns.add(
new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveType(fieldTypes[i]), null));
new FieldSchema(fieldNames[i], HiveTypeUtil.toHiveTypeName(fieldTypes[i]), null));
}

return columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@

package org.apache.flink.table.catalog.hive.util;

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;

import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand All @@ -35,43 +31,50 @@
*/
public class HiveTypeUtil {

// Note: Need to keep this in sync with BaseSemanticAnalyzer::getTypeStringFromAST
private static final String HIVE_ARRAY_TYPE_NAME_FORMAT = serdeConstants.LIST_TYPE_NAME + "<%s>";

private HiveTypeUtil() {
}

/**
* Convert Flink data type to Hive data type.
* Convert Flink data type to Hive data type name.
* TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
* [FLINK-12386] Support complete mapping between Flink and Hive data types
*
* @param type a Flink data type
* @return the corresponding Hive data type name
*/
public static String toHiveTypeName(DataType type) {
return toHiveTypeInfo(type).getTypeName();
}

/**
* Convert Flink data type to Hive data type.
*
* @param type a Flink data type
* @return the corresponding Hive data type
*/
public static String toHiveType(TypeInformation type) {
if (type.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
return serdeConstants.BOOLEAN_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
return serdeConstants.TINYINT_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
return serdeConstants.SMALLINT_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.INT_TYPE_INFO)) {
return serdeConstants.INT_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
return serdeConstants.BIGINT_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
return serdeConstants.FLOAT_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
return serdeConstants.DOUBLE_TYPE_NAME;
} else if (type.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
return serdeConstants.STRING_TYPE_NAME;
} else if (type.equals(SqlTimeTypeInfo.DATE)) {
return serdeConstants.DATE_TYPE_NAME;
} else if (type.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
return serdeConstants.BINARY_TYPE_NAME;
} else if (type.equals(SqlTimeTypeInfo.TIMESTAMP)) {
return serdeConstants.TIMESTAMP_TYPE_NAME;
public static TypeInfo toHiveTypeInfo(DataType type) {
if (type.equals(DataTypes.BOOLEAN())) {
return TypeInfoFactory.booleanTypeInfo;
} else if (type.equals(DataTypes.TINYINT())) {
return TypeInfoFactory.byteTypeInfo;
} else if (type.equals(DataTypes.SMALLINT())) {
return TypeInfoFactory.shortTypeInfo;
} else if (type.equals(DataTypes.INT())) {
return TypeInfoFactory.intTypeInfo;
} else if (type.equals(DataTypes.BIGINT())) {
return TypeInfoFactory.longTypeInfo;
} else if (type.equals(DataTypes.FLOAT())) {
return TypeInfoFactory.floatTypeInfo;
} else if (type.equals(DataTypes.DOUBLE())) {
return TypeInfoFactory.doubleTypeInfo;
} else if (type.equals(DataTypes.STRING())) {
return TypeInfoFactory.stringTypeInfo;
} else if (type.equals(DataTypes.DATE())) {
return TypeInfoFactory.dateTypeInfo;
} else if (type.equals(DataTypes.BYTES())) {
return TypeInfoFactory.binaryTypeInfo;
} else if (type.equals(DataTypes.TIMESTAMP())) {
return TypeInfoFactory.timestampTypeInfo;
} else {
throw new UnsupportedOperationException(
String.format("Flink doesn't support converting type %s to Hive type yet.", type.toString()));
Expand All @@ -86,13 +89,13 @@ public static String toHiveType(TypeInformation type) {
* @param hiveType a Hive data type
* @return the corresponding Flink data type
*/
public static TypeInformation toFlinkType(TypeInfo hiveType) {
public static DataType toFlinkType(TypeInfo hiveType) {
switch (hiveType.getCategory()) {
case PRIMITIVE:
return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
return BasicArrayTypeInfo.getInfoFor(toFlinkType(listTypeInfo.getListElementTypeInfo()).getTypeClass());
return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
default:
throw new UnsupportedOperationException(
String.format("Flink doesn't support Hive data type %s yet.", hiveType));
Expand All @@ -101,44 +104,36 @@ public static TypeInformation toFlinkType(TypeInfo hiveType) {

// TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
// [FLINK-12386] Support complete mapping between Flink and Hive data types
private static TypeInformation toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
switch (hiveType.getPrimitiveCategory()) {
// For CHAR(p) and VARCHAR(p) types, map them to String for now because Flink doesn't yet support them.
case CHAR:
case VARCHAR:
case STRING:
return BasicTypeInfo.STRING_TYPE_INFO;
return DataTypes.STRING();
case BOOLEAN:
return BasicTypeInfo.BOOLEAN_TYPE_INFO;
return DataTypes.BOOLEAN();
case BYTE:
return BasicTypeInfo.BYTE_TYPE_INFO;
return DataTypes.TINYINT();
case SHORT:
return BasicTypeInfo.SHORT_TYPE_INFO;
return DataTypes.SMALLINT();
case INT:
return BasicTypeInfo.INT_TYPE_INFO;
return DataTypes.INT();
case LONG:
return BasicTypeInfo.LONG_TYPE_INFO;
return DataTypes.BIGINT();
case FLOAT:
return BasicTypeInfo.FLOAT_TYPE_INFO;
return DataTypes.FLOAT();
case DOUBLE:
return BasicTypeInfo.DOUBLE_TYPE_INFO;
return DataTypes.DOUBLE();
case DATE:
return SqlTimeTypeInfo.DATE;
return DataTypes.DATE();
case TIMESTAMP:
return SqlTimeTypeInfo.TIMESTAMP;
return DataTypes.TIMESTAMP();
case BINARY:
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
return DataTypes.BYTES();
default:
throw new UnsupportedOperationException(
String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
}
}

/**
* Converts a Flink {@link TypeInformation} to corresponding Hive {@link TypeInfo}.
*/
public static TypeInfo toHiveTypeInfo(TypeInformation flinkType) {
// TODO: support complex data types
return TypeInfoFactory.getPrimitiveTypeInfo(toHiveType(flinkType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

package org.apache.flink.table.catalog.hive;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
Expand All @@ -33,6 +30,7 @@
import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.types.DataType;

import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -57,32 +55,36 @@ public static void init() throws IOException {
public void testDataTypes() throws Exception {
// TODO: the following Hive types are not supported in Flink yet, including CHAR, VARCHAR, DECIMAL, MAP, STRUCT
// [FLINK-12386] Support complete mapping between Flink and Hive data types
TypeInformation[] types = new TypeInformation[] {
BasicTypeInfo.BYTE_TYPE_INFO,
BasicTypeInfo.SHORT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.FLOAT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
SqlTimeTypeInfo.DATE,
SqlTimeTypeInfo.TIMESTAMP
DataType[] types = new DataType[] {
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.BOOLEAN(),
DataTypes.STRING(),
DataTypes.BYTES(),
DataTypes.DATE(),
DataTypes.TIMESTAMP()
};

verifyDataTypes(types);
}

private void verifyDataTypes(TypeInformation[] types) throws Exception {
private void verifyDataTypes(DataType[] types) throws Exception {
String[] colNames = new String[types.length];

for (int i = 0; i < types.length; i++) {
colNames[i] = types[i].toString().toLowerCase() + "_col";
}

TableSchema schema = TableSchema.builder()
.fields(colNames, types)
.build();

CatalogTable table = new GenericCatalogTable(
new TableSchema(colNames, types),
schema,
getBatchTableProperties(),
TEST_COMMENT
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,20 @@ public Builder field(String name, DataType dataType) {
return this;
}

/**
* Add an array of fields with names and data types.
*
* <p>The call order of this method determines the order of fields in the schema.
*/
public Builder fields(String[] names, DataType[] dataTypes) {
Preconditions.checkNotNull(names);
Preconditions.checkNotNull(dataTypes);

fieldNames.addAll(Arrays.asList(names));
fieldDataTypes.addAll(Arrays.asList(dataTypes));
return this;
}

/**
* @deprecated This method will be removed in future versions as it uses the old type system. It
* is recommended to use {@link #field(String, DataType)} instead which uses the new type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

package org.apache.flink.table.catalog;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
Expand Down Expand Up @@ -1170,25 +1169,19 @@ public void testListPartitionPartialSpec() throws Exception {
public abstract CatalogPartition createPartition();

protected TableSchema createTableSchema() {
return new TableSchema(
new String[] {"first", "second", "third"},
new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
}
);
return TableSchema.builder()
.field("first", DataTypes.STRING())
.field("second", DataTypes.INT())
.field("third", DataTypes.STRING())
.build();
}

protected TableSchema createAnotherTableSchema() {
return new TableSchema(
new String[] {"first2", "second", "third"},
new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
}
);
return TableSchema.builder()
.field("first", DataTypes.STRING())
.field("second", DataTypes.STRING())
.field("third", DataTypes.STRING())
.build();
}

protected List<String> createPartitionKeys() {
Expand Down

0 comments on commit baaea36

Please sign in to comment.