Skip to content

Commit

Permalink
[FLINK-23479][table-planner] Fix the unstable test cases about json plan
Browse files Browse the repository at this point in the history
The reason about the unstable test cases is the inner serializer instance in RawType is serialized directly as String to json which is mutable. While for a RawType representing DataView, the inner logical type is stable and accessible, we should serialize the inner logical type to make the json plan stable. For other case, we just keep the previous behavior.

This closes apache#16599
  • Loading branch information
godfreyhe committed Aug 9, 2021
1 parent 3d664a9 commit 785c72a
Show file tree
Hide file tree
Showing 14 changed files with 863 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.dataview.ListView;
import org.apache.flink.table.api.dataview.MapView;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
Expand All @@ -30,6 +34,7 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.StructuredType;
import org.apache.flink.table.types.logical.SymbolType;
Expand All @@ -40,6 +45,8 @@
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
import org.apache.flink.table.utils.EncodingUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
Expand All @@ -55,13 +62,15 @@

import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_COMPARISON;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_DATA_VIEW_CLASS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_ELEMENT_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_FIELDS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_FINAL;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IDENTIFIER;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IMPLEMENTATION_CLASS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_INSTANTIABLE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_IS_INTERNAL_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_KEY_TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_LENGTH;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer.FIELD_NAME_LOGICAL_TYPE;
Expand Down Expand Up @@ -130,6 +139,11 @@ public LogicalType deserialize(JsonNode logicalTypeNode, SerdeContext serdeCtx)
return deserializeArrayType(logicalTypeNode, serdeCtx);
case MULTISET:
return deserializeMultisetType(logicalTypeNode, serdeCtx);
case RAW:
// This method only deserializes the LogicalType with `type='RAW'`,
// otherwise it will fallback to
// `LogicalTypeParser.parse(logicalTypeNode.asText())`
return deserializeRawType(logicalTypeNode, serdeCtx);
default:
throw new TableException("Unsupported type name:" + typeName);
}
Expand Down Expand Up @@ -355,4 +369,45 @@ private LocalZonedTimestampType deserializeLocalZonedTimestampType(JsonNode logi
TimestampKind.valueOf(logicalTypeNode.get(FIELD_NAME_TIMESTAMP_KIND).asText());
return new LocalZonedTimestampType(nullable, timestampKind, precision);
}

private RawType<?> deserializeRawType(JsonNode logicalTypeNode, SerdeContext serdeCtx) {
if (!logicalTypeNode.has(FIELD_NAME_DATA_VIEW_CLASS)) {
throw new TableException("Only RowType for DataView class is supported now");
}
boolean nullable = logicalTypeNode.get(FIELD_NAME_NULLABLE).asBoolean();
String dataViewClass = logicalTypeNode.get(FIELD_NAME_DATA_VIEW_CLASS).asText();
final DataType dataViewDataType;
if (MapView.class.getName().equals(dataViewClass)) {
DataType keyDataType =
deserializeDataTypeForDataView(logicalTypeNode, FIELD_NAME_KEY_TYPE, serdeCtx);
DataType valueDataType =
deserializeDataTypeForDataView(
logicalTypeNode, FIELD_NAME_VALUE_TYPE, serdeCtx);
dataViewDataType = MapView.newMapViewDataType(keyDataType, valueDataType);
} else if (ListView.class.getName().equals(dataViewClass)) {
DataType elementDataType =
deserializeDataTypeForDataView(
logicalTypeNode, FIELD_NAME_ELEMENT_TYPE, serdeCtx);
dataViewDataType = ListView.newListViewDataType(elementDataType);
} else {
throw new TableException("Only MapView and ListView are supported now");
}
// we only Use DataViewUtils.adjustDataViews method to create the DataType for DataView,
// so the hasStateBackedDataViews is always false
DataType dataType = DataViewUtils.adjustDataViews(dataViewDataType, false);
RawType<?> rawType = (RawType<?>) LogicalTypeDataTypeConverter.toLogicalType(dataType);
return (RawType<?>) rawType.copy(nullable);
}

private DataType deserializeDataTypeForDataView(
JsonNode logicalTypeNode, String key, SerdeContext serdeCtx) {
JsonNode jsonNode = logicalTypeNode.get(key);
LogicalType logicalType = deserialize(jsonNode.get(FIELD_NAME_TYPE_NAME), serdeCtx);
DataType dataType = LogicalTypeDataTypeConverter.toDataType(logicalType);
boolean isInternalType = jsonNode.get(FIELD_NAME_IS_INTERNAL_TYPE).asBoolean();
if (isInternalType) {
dataType = DataTypeUtils.toInternalDataType(dataType);
}
return dataType;
}
}
Loading

0 comments on commit 785c72a

Please sign in to comment.