Skip to content

Commit

Permalink
[FLINK-19786][avro] Fix the nullability and precision for Avro format…
Browse files Browse the repository at this point in the history
… deserialization

* Fix the TIME schema precision as 3
* Fix the nullability of type: TIMESTAMP_WITHOUT_TIME_ZONE, DATE, TIME_WITHOUT_TIME_ZONE,
  DECIMAL, MAP, ARRAY
* The table schema row type should be always non-nullable
  • Loading branch information
danny0405 authored and dawidwys committed Oct 29, 2020
1 parent f570d04 commit 146269d
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -315,10 +316,9 @@ public TableSchema convertsToTableSchema() {
int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
String family = familyNames[familyIndex];
fieldNames[i] = family;
fieldTypes[i] = TableSchema.builder()
.fields(getQualifierNames(family), getQualifierDataTypes(family))
.build()
.toRowDataType();
fieldTypes[i] = getRowDataType(
getQualifierNames(family),
getQualifierDataTypes(family));
}
}
return TableSchema.builder().fields(fieldNames, fieldTypes).build();
Expand All @@ -328,15 +328,30 @@ public TableSchema convertsToTableSchema() {
for (int i = 0; i < fieldNames.length; i++) {
String family = familyNames[i];
fieldNames[i] = family;
fieldTypes[i] = TableSchema.builder()
.fields(getQualifierNames(family), getQualifierDataTypes(family))
.build()
.toRowDataType();
fieldTypes[i] = getRowDataType(
getQualifierNames(family),
getQualifierDataTypes(family));
}
return TableSchema.builder().fields(fieldNames, fieldTypes).build();
}
}

/**
* Returns row data type with given field names {@code fieldNames}
* and data types {@code fieldTypes}.
*
* @param fieldNames the field names
* @param fieldTypes the field types
* @return nullable row type
*/
private static DataType getRowDataType(String[] fieldNames, DataType[] fieldTypes) {
final DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
for (int j = 0; j < fieldNames.length; j++) {
fields[j] = DataTypes.FIELD(fieldNames[j], fieldTypes[j]);
}
return DataTypes.ROW(fields);
}

/**
* Construct a {@link HBaseTableSchema} from a {@link TableSchema}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ private static class CollectionTableSource extends InputFormatTableSource<Row> {

@Override
public DataType getProducedDataType() {
return TypeConversions.fromLegacyInfoToDataType(rowTypeInfo);
return TypeConversions.fromLegacyInfoToDataType(rowTypeInfo).notNull();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {
RowType rowType = (RowType) dataType.getLogicalType();

AvroRowDataSerializationSchema serializer = getSerializationSchema(rowType, schema);
Schema writeSchema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
Schema writeSchema = AvroSchemaConverter
.convertToSchema(dataType.getLogicalType());
AvroRowDataDeserializationSchema deserializer =
getDeserializationSchema(rowType, writeSchema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private static DataType convertToDataType(Schema schema) {
if (logicalType == LogicalTypes.date()) {
return DataTypes.DATE().notNull();
} else if (logicalType == LogicalTypes.timeMillis()) {
return DataTypes.TIME().notNull();
return DataTypes.TIME(3).notNull();
}
return DataTypes.INT().notNull();
case LONG:
Expand All @@ -274,6 +274,8 @@ private static DataType convertToDataType(Schema schema) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return DataTypes.TIME(6).notNull();
}
Expand All @@ -293,16 +295,23 @@ private static DataType convertToDataType(Schema schema) {
/**
* Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
*
* @param logicalType logical type
* <p>Use "record" as the type name.
*
* @param schema the schema type, usually it should be the top level record type,
* e.g. not a nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType logicalType) {
return convertToSchema(logicalType, "record");
public static Schema convertToSchema(LogicalType schema) {
return convertToSchema(schema, "record");
}

/**
* Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
*
* <p>The "{rowName}_" is used as the nested row type name prefix in order to
* generate the right schema. Nested record type that only differs with type name
* is still compatible.
*
* @param logicalType logical type
* @param rowName the record name
* @return Avro's {@link Schema} matching this logical type.
Expand Down Expand Up @@ -349,10 +358,12 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
throw new IllegalArgumentException("Avro does not support TIMESTAMP type " +
"with precision: " + precision + ", it only supports precision less than 3.");
}
return avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
case DATE:
// use int to represents Date
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
return nullable ? nullableSchema(date) : date;
case TIME_WITHOUT_TIME_ZONE:
precision = ((TimeType) logicalType).getPrecision();
if (precision > 3) {
Expand All @@ -361,13 +372,16 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
", it only supports precision less than 3.");
}
// use int to represents Time, we only support millisecond when deserialization
return LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
Schema time = LogicalTypes.timeMillis()
.addToSchema(SchemaBuilder.builder().intType());
return nullable ? nullableSchema(time) : time;
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
// store BigDecimal as byte[]
return LogicalTypes
.decimal(decimalType.getPrecision(), decimalType.getScale())
.addToSchema(SchemaBuilder.builder().bytesType());
Schema decimal = LogicalTypes
.decimal(decimalType.getPrecision(), decimalType.getScale())
.addToSchema(SchemaBuilder.builder().bytesType());
return nullable ? nullableSchema(decimal) : decimal;
case ROW:
RowType rowType = (RowType) logicalType;
List<String> fieldNames = rowType.getFieldNames();
Expand All @@ -386,18 +400,16 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
return nullable ? nullableSchema(record) : record;
case MULTISET:
case MAP:
return SchemaBuilder
.builder()
.nullable()
.map()
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
Schema map = SchemaBuilder.builder()
.map()
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
return nullable ? nullableSchema(map) : map;
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
return SchemaBuilder
.builder()
.nullable()
.array()
.items(convertToSchema(arrayType.getElementType(), rowName));
Schema array = SchemaBuilder.builder()
.array()
.items(convertToSchema(arrayType.getElementType(), rowName));
return nullable ? nullableSchema(array) : array;
case RAW:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void testSerializeDeserialize() throws Exception {
FIELD("map", MAP(STRING(), BIGINT())),
FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))),
FIELD("map2array", MAP(STRING(), ARRAY(INT()))),
FIELD("nullEntryMap", MAP(STRING(), STRING())));
FIELD("nullEntryMap", MAP(STRING(), STRING())))
.notNull();
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

Expand Down Expand Up @@ -182,9 +183,10 @@ public void testSpecificType() throws Exception {
byte[] input = byteArrayOutputStream.toByteArray();

DataType dataType = ROW(
FIELD("type_timestamp_millis", TIMESTAMP(3)),
FIELD("type_date", DATE()),
FIELD("type_time_millis", TIME(3)));
FIELD("type_timestamp_millis", TIMESTAMP(3).notNull()),
FIELD("type_date", DATE().notNull()),
FIELD("type_time_millis", TIME(3).notNull()))
.notNull();
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
Expand Down
Loading

0 comments on commit 146269d

Please sign in to comment.