Skip to content

Commit

Permalink
[FLINK-30613][serializer] Migrate CompositeTypeSerializerSnapshot to …
Browse files Browse the repository at this point in the history
…implement new method of resolving schema compatibility
  • Loading branch information
masteryhx committed Jan 15, 2024
1 parent 13921a0 commit e93b55a
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.util.Arrays;

import static org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -47,10 +48,10 @@
*
* <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView,
* ClassLoader)}, and {@link #resolveOuterSchemaCompatibility(TypeSerializer)} when using this class
* as the base for its serializer snapshot class. By default, the base implementations of these
* methods are empty, i.e. this class assumes that subclasses do not have any outer snapshot that
* needs to be persisted.
* ClassLoader)}, and {@link #resolveOuterSchemaCompatibility(TypeSerializerSnapshot)} when using
* this class as the base for its serializer snapshot class. By default, the base implementations of
* these methods are empty, i.e. this class assumes that subclasses do not have any outer snapshot
* that needs to be persisted.
*
* <h2>Snapshot Versioning</h2>
*
Expand Down Expand Up @@ -177,32 +178,36 @@ public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
}

@Override
public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
if (!(oldSerializerSnapshot instanceof CompositeTypeSerializerSnapshot)) {
return TypeSerializerSchemaCompatibility.incompatible();
}

TypeSerializerSnapshot<?>[] oldNestedSerializerSnapshots =
((CompositeTypeSerializerSnapshot<?, ?>) oldSerializerSnapshot)
.getNestedSerializerSnapshots();

return internalResolveSchemaCompatibility(
newSerializer, nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
oldSerializerSnapshot, oldNestedSerializerSnapshots);
}

@Internal
TypeSerializerSchemaCompatibility<T> internalResolveSchemaCompatibility(
TypeSerializer<T> newSerializer, TypeSerializerSnapshot<?>[] snapshots) {
if (newSerializer.getClass() != correspondingSerializerClass) {
return TypeSerializerSchemaCompatibility.incompatible();
}

S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);

final OuterSchemaCompatibility outerSchemaCompatibility =
resolveOuterSchemaCompatibility(castedNewSerializer);
TypeSerializerSnapshot<T> oldSnapshot, TypeSerializerSnapshot<?>[] oldNestedSnapshots) {
TypeSerializerSnapshot<?>[] newNestedSerializerSnapshots =
nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots();

final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer);
// check that nested serializer arity remains identical; if not, short circuit result
if (newNestedSerializers.length != snapshots.length) {
if (newNestedSerializerSnapshots.length != oldNestedSnapshots.length) {
return TypeSerializerSchemaCompatibility.incompatible();
}

final OuterSchemaCompatibility outerSchemaCompatibility =
resolveOuterSchemaCompatibility(oldSnapshot);

return constructFinalSchemaCompatibilityResult(
newNestedSerializers, snapshots, outerSchemaCompatibility);
newNestedSerializerSnapshots, oldNestedSnapshots, outerSchemaCompatibility);
}

@Internal
Expand Down Expand Up @@ -264,7 +269,7 @@ protected abstract S createOuterSerializerWithNestedSerializers(
* serializer contains some extra information that needs to be persisted as part of the
* serializer snapshot, this must be overridden. Note that this method and the corresponding
* methods {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link
* #resolveOuterSchemaCompatibility(TypeSerializer)} needs to be implemented.
* #resolveOuterSchemaCompatibility(TypeSerializerSnapshot)} needs to be implemented.
*
* @param out the {@link DataOutputView} to write the outer snapshot to.
*/
Expand All @@ -279,7 +284,7 @@ protected void writeOuterSnapshot(DataOutputView out) throws IOException {}
* serializer contains some extra information that has been persisted as part of the serializer
* snapshot, this must be overridden. Note that this method and the corresponding methods {@link
* #writeOuterSnapshot(DataOutputView)}, {@link
* #resolveOuterSchemaCompatibility(TypeSerializer)} needs to be implemented.
* #resolveOuterSchemaCompatibility(TypeSerializerSnapshot)} needs to be implemented.
*
* @param readOuterSnapshotVersion the read version of the outer snapshot.
* @param in the {@link DataInputView} to read the outer snapshot from.
Expand All @@ -289,6 +294,40 @@ protected void readOuterSnapshot(
int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {}

/**
* Checks the schema compatibility of the given old serializer snapshot based on the outer
* snapshot.
*
* <p>The base implementation of this method assumes that the outer serializer only has nested
* serializers and no extra information, and therefore the result of the check is {@link
* OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains some
* extra information that has been persisted as part of the serializer snapshot, this must be
* overridden. Note that this method and the corresponding methods {@link
* #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView,
* ClassLoader)} needs to be implemented.
*
* @param oldSerializerSnapshot the old serializer snapshot, which contains the old outer
* information to check against.
* @return a {@link OuterSchemaCompatibility} indicating whether the new serializer's outer
* information is compatible, requires migration, or incompatible with the one written in
* this snapshot.
*/
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
// Call deprecated methods as default, which will be removed after removing these deprecated
// methods
@SuppressWarnings("unchecked")
S newSerializer = (S) this.restoreSerializer();
if (isOuterSnapshotCompatible(newSerializer)) {
if (oldSerializerSnapshot instanceof CompositeTypeSerializerSnapshot) {
return ((CompositeTypeSerializerSnapshot<T, S>) oldSerializerSnapshot)
.resolveOuterSchemaCompatibility(newSerializer);
}
return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
}
return OuterSchemaCompatibility.INCOMPATIBLE;
}

/**
* Checks whether the outer snapshot is compatible with a given new serializer.
*
Expand All @@ -305,7 +344,7 @@ protected void readOuterSnapshot(
* @return a flag indicating whether or not the new serializer's outer information is compatible
* with the one written in this snapshot.
* @deprecated this method is deprecated, and will be removed in the future. Please implement
* {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
* {@link #resolveOuterSchemaCompatibility(TypeSerializerSnapshot)} instead.
*/
@Deprecated
protected boolean isOuterSnapshotCompatible(S newSerializer) {
Expand All @@ -323,16 +362,17 @@ protected boolean isOuterSnapshotCompatible(S newSerializer) {
* #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView,
* ClassLoader)} needs to be implemented.
*
* @deprecated this method is deprecated, and will be removed in the future. Please implement
* {@link #resolveOuterSchemaCompatibility(TypeSerializerSnapshot)} instead.
* @param newSerializer the new serializer, which contains the new outer information to check
* against.
* @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer
* information is compatible, requires migration, or incompatible with the one written in
* this snapshot.
*/
@Deprecated
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(S newSerializer) {
return (isOuterSnapshotCompatible(newSerializer))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
}

// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -369,13 +409,13 @@ private void legacyInternalReadOuterSnapshot(
}

private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
TypeSerializer<?>[] newNestedSerializers,
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
TypeSerializerSnapshot<?>[] newNestedSerializerSnapshots,
TypeSerializerSnapshot<?>[] oldNestedSerializerSnapshots,
OuterSchemaCompatibility outerSchemaCompatibility) {

IntermediateCompatibilityResult<T> nestedSerializersCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
newNestedSerializers, nestedSerializerSnapshots);
newNestedSerializerSnapshots, oldNestedSerializerSnapshots);

if (outerSchemaCompatibility == OuterSchemaCompatibility.INCOMPATIBLE
|| nestedSerializersCompatibilityResult.isIncompatible()) {
Expand All @@ -388,7 +428,6 @@ private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityRe
}

if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
@SuppressWarnings("unchecked")
TypeSerializer<T> reconfiguredCompositeSerializer =
createOuterSerializerWithNestedSerializers(
nestedSerializersCompatibilityResult.getNestedSerializers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ public class CompositeTypeSerializerUtil {
* can be used by legacy snapshot classes, which have a newer implementation implemented as a
* {@link CompositeTypeSerializerSnapshot}.
*
* @param newSerializer the new serializer to check for compatibility.
* @param legacySerializerSnapshot the legacy serializer snapshot to check for compatibility.
* @param newCompositeSnapshot an instance of the new snapshot class to delegate compatibility
* checks to. This instance should already contain the outer snapshot information.
* @param legacyNestedSnapshots the nested serializer snapshots of the legacy composite
* snapshot.
* @return the result compatibility.
*/
public static <T> TypeSerializerSchemaCompatibility<T> delegateCompatibilityCheckToNewSnapshot(
TypeSerializer<T> newSerializer,
CompositeTypeSerializerSnapshot<T, ? extends TypeSerializer> newCompositeSnapshot,
TypeSerializerSnapshot<T> legacySerializerSnapshot,
CompositeTypeSerializerSnapshot<T, ? extends TypeSerializer<T>> newCompositeSnapshot,
TypeSerializerSnapshot<?>... legacyNestedSnapshots) {

checkArgument(legacyNestedSnapshots.length > 0);
return newCompositeSnapshot.internalResolveSchemaCompatibility(
newSerializer, legacyNestedSnapshots);
legacySerializerSnapshot, legacyNestedSnapshots);
}

/**
Expand Down Expand Up @@ -80,26 +80,29 @@ public static void setNestedSerializersSnapshots(
* TypeSerializerSchemaCompatibility#compatibleAfterMigration()}, and {@link
* TypeSerializerSchemaCompatibility#incompatible()}, these results are considered final.
*
* @param newNestedSerializers the new nested serializers to check for compatibility.
* @param nestedSerializerSnapshots the associated nested serializers' snapshots.
* @return the intermediate compatibility result of the new nested serializers.
* @param newNestedSerializerSnapshots the new nested serializer snapshots to check for
* compatibility.
* @param oldNestedSerializerSnapshots the associated previous nested serializers' snapshots.
* @return the intermediate compatibility result of the new nested serializer snapshots.
*/
public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompatibilityResult(
TypeSerializer<?>[] newNestedSerializers,
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
TypeSerializerSnapshot<?>[] newNestedSerializerSnapshots,
TypeSerializerSnapshot<?>[] oldNestedSerializerSnapshots) {

Preconditions.checkArgument(
newNestedSerializers.length == nestedSerializerSnapshots.length,
"Different number of new serializers and existing serializer snapshots.");
newNestedSerializerSnapshots.length == oldNestedSerializerSnapshots.length,
"Different number of new serializer snapshots and existing serializer snapshots.");

TypeSerializer<?>[] nestedSerializers = new TypeSerializer[newNestedSerializers.length];
TypeSerializer<?>[] nestedSerializers =
new TypeSerializer[newNestedSerializerSnapshots.length];

// check nested serializers for compatibility
boolean nestedSerializerRequiresMigration = false;
boolean hasReconfiguredNestedSerializers = false;
for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
for (int i = 0; i < oldNestedSerializerSnapshots.length; i++) {
TypeSerializerSchemaCompatibility<?> compatibility =
resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);
resolveCompatibility(
newNestedSerializerSnapshots[i], oldNestedSerializerSnapshots[i]);

// if any one of the new nested serializers is incompatible, we can just short circuit
// the result
Expand All @@ -113,7 +116,7 @@ public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompat
hasReconfiguredNestedSerializers = true;
nestedSerializers[i] = compatibility.getReconfiguredSerializer();
} else if (compatibility.isCompatibleAsIs()) {
nestedSerializers[i] = newNestedSerializers[i];
nestedSerializers[i] = newNestedSerializerSnapshots[i].restoreSerializer();
} else {
throw new IllegalStateException("Undefined compatibility type.");
}
Expand Down Expand Up @@ -216,11 +219,11 @@ public TypeSerializer<?>[] getNestedSerializers() {

@SuppressWarnings("unchecked")
private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
TypeSerializer<?> serializer, TypeSerializerSnapshot<?> snapshot) {
TypeSerializerSnapshot<?> newSnapshot, TypeSerializerSnapshot<?> oldSnapshot) {

TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;
TypeSerializerSnapshot<E> typedNewSnapshot = (TypeSerializerSnapshot<E>) newSnapshot;
TypeSerializerSnapshot<E> typedOldSnapshot = (TypeSerializerSnapshot<E>) oldSnapshot;

return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
return typedNewSnapshot.resolveSchemaCompatibility(typedOldSnapshot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,6 @@ public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
return nestedSnapshots;
}

/**
* Resolves the compatibility of the nested serializer snapshots with the nested serializers of
* the new outer serializer.
*
* @deprecated this no method will be removed in the future. Resolving compatibility for nested
* serializers is now handled by {@link CompositeTypeSerializerSnapshot}.
*/
@Deprecated
public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
TypeSerializerSchemaCompatibility<?> outerCompatibility,
TypeSerializer<?>... newNestedSerializers) {

checkArgument(
newNestedSerializers.length == nestedSnapshots.length,
"Different number of new serializers and existing serializer configuration snapshots");

// compatibility of the outer serializer's format
if (outerCompatibility.isIncompatible()) {
return TypeSerializerSchemaCompatibility.incompatible();
}

// check nested serializers for compatibility
boolean nestedSerializerRequiresMigration = false;
for (int i = 0; i < nestedSnapshots.length; i++) {
TypeSerializerSchemaCompatibility<?> compatibility =
resolveCompatibility(newNestedSerializers[i], nestedSnapshots[i]);

if (compatibility.isIncompatible()) {
return TypeSerializerSchemaCompatibility.incompatible();
}
if (compatibility.isCompatibleAfterMigration()) {
nestedSerializerRequiresMigration = true;
}
}

return (nestedSerializerRequiresMigration || !outerCompatibility.isCompatibleAsIs())
? TypeSerializerSchemaCompatibility.compatibleAfterMigration()
: TypeSerializerSchemaCompatibility.compatibleAsIs();
}

// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -183,17 +143,6 @@ public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(
// Utilities
// ------------------------------------------------------------------------

/** Utility method to conjure up a new scope for the generic parameters. */
@SuppressWarnings("unchecked")
private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
TypeSerializer<?> serializer, TypeSerializerSnapshot<?> snapshot) {

TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;

return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
}

private static TypeSerializer<?>[] snapshotsToRestoreSerializers(
TypeSerializerSnapshot<?>... snapshots) {
return Arrays.stream(snapshots)
Expand Down
Loading

0 comments on commit e93b55a

Please sign in to comment.