Skip to content

Commit

Permalink
[FLINK-30613][serializer] Introduce new method to reverse the directi…
Browse files Browse the repository at this point in the history
…on of resolving schema compatibility
  • Loading branch information
masteryhx committed Jan 15, 2024
1 parent 5d9d874 commit 13921a0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public abstract class TypeSerializer<T> implements Serializable {
* evolution and be future-proof. See the class-level comments, section "Upgrading
* TypeSerializers to the new TypeSerializerSnapshot model" for details.
*
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot)
* @return snapshot of the serializer's current configuration (cannot be {@code null}).
*/
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @param <T> the type of data serialized by the serializer that was being checked.
* @see TypeSerializer
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot)
*/
@PublicEvolving
public class TypeSerializerSchemaCompatibility<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,40 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo
* program's serializer re-serializes the data, thus converting the format during the restore
* operation.
*
* @deprecated This method has been replaced by {@link TypeSerializerSnapshot
* #resolveSchemaCompatibility(TypeSerializerSnapshot)}.
* @param newSerializer the new serializer to check.
* @return the serializer compatibility result.
*/
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer);
@Deprecated
default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
return newSerializer.snapshotConfiguration().resolveSchemaCompatibility(this);
}

/**
* Checks current serializer's compatibility to read data written by the prior serializer.
*
* <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
* format of the data in the checkpoint/savepoint is compatible for the format of the serializer
* used by the program that restores the checkpoint/savepoint. The outcome can be that the
* serialization format is compatible, that the program's serializer needs to reconfigure itself
* (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible),
* that the format is outright incompatible, or that a migration needed. In the latter case, the
* TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring
* program's serializer re-serializes the data, thus converting the format during the restore
* operation.
*
* <p>This method must be implemented to clarify the compatibility. See FLIP-263 for more
* details.
*
* @param oldSerializerSnapshot the old serializer snapshot to check.
* @return the serializer compatibility result.
*/
default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());
}

// ------------------------------------------------------------------------
// read / write utilities
Expand Down

0 comments on commit 13921a0

Please sign in to comment.