Skip to content

Commit

Permalink
[FLINK-30613][serializer] Add doc about migrating TypeSerializerSnaps…
Browse files Browse the repository at this point in the history
…hot#resolveSchemaCompatibility
  • Loading branch information
masteryhx committed Jan 15, 2024
1 parent 6fb65a0 commit 909e06e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public interface TypeSerializerSnapshot<T> {
int getCurrentVersion();
void writeSnapshot(DataOuputView out) throws IOException;
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot);
TypeSerializer<T> restoreSerializer();
}
```
Expand All @@ -125,7 +125,7 @@ the read implementation can handle different versions.

At restore time, the logic that detects whether or not the new serializer's schema has changed should be implemented in
the `resolveSchemaCompatibility` method. When previous registered state is registered again with new serializers in the
restored execution of an operator, the new serializer is provided to the previous serializer's snapshot via this method.
restored execution of an operator, the old serializer snapshot is provided to the new serializer's snapshot via this method.
This method returns a `TypeSerializerSchemaCompatibility` representing the result of the compatibility resolution,
which can be one of the following:

Expand Down Expand Up @@ -162,7 +162,7 @@ to the implementation of state serializers and their serializer snapshots.
3. **Restored execution re-accesses restored state bytes with new state serializer that has schema _B_**
- The previous state serializer's snapshot is restored.
- State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema *A*).
- Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
Expand All @@ -186,7 +186,7 @@ to the implementation of state serializers and their serializer snapshots.
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects.
- From now on, all of the state is already deserialized.
4. **Restored execution re-accesses previous state with new state serializer that has schema _B_**
- Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
- If the compatibility check signals that migration is required, nothing happens in this case since for
heap backends, all state is already deserialized into objects.
Expand Down Expand Up @@ -302,7 +302,7 @@ the nested element serializer.
In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.
* `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: checks the compatibility based on the outer snapshot information.

By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
read / write, and therefore have empty default implementations for the above methods. If the subclass
Expand Down Expand Up @@ -341,12 +341,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
}

@Override
protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;

@Override
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
GenericArraySerializerSnapshot<C[]> oldGenericArraySerializerSnapshot =
(GenericArraySerializerSnapshot<C[]>) oldSerializerSnapshot;
return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass)
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

@Override
Expand Down Expand Up @@ -442,4 +445,23 @@ migrate from the old abstractions. The steps to do this is as follows:
`TypeSerializerConfigSnapshot` implementation as will as the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer).

## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19

This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.19.

Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
(maybe in Flink library) has to meet the future need.
Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer has to be modified.
There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
not supported in some scenarios.

So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated
and will be removed in the future. it is highly recommended to migrate from the old one to
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows:

1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.
2. Remove the old method `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public interface TypeSerializerSnapshot<T> {
int getCurrentVersion();
void writeSnapshot(DataOuputView out) throws IOException;
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot);
TypeSerializer<T> restoreSerializer();
}
```
Expand All @@ -126,7 +126,7 @@ the read implementation can handle different versions.

At restore time, the logic that detects whether or not the new serializer's schema has changed should be implemented in
the `resolveSchemaCompatibility` method. When previous registered state is registered again with new serializers in the
restored execution of an operator, the new serializer is provided to the previous serializer's snapshot via this method.
restored execution of an operator, the old serializer snapshot is provided to the new serializer's snapshot via this method.
This method returns a `TypeSerializerSchemaCompatibility` representing the result of the compatibility resolution,
which can be one of the following:

Expand Down Expand Up @@ -163,7 +163,7 @@ to the implementation of state serializers and their serializer snapshots.
3. **Restored execution re-accesses restored state bytes with new state serializer that has schema _B_**
- The previous state serializer's snapshot is restored.
- State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema *A*).
- Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
Expand All @@ -187,7 +187,7 @@ to the implementation of state serializers and their serializer snapshots.
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects.
- From now on, all of the state is already deserialized.
4. **Restored execution re-accesses previous state with new state serializer that has schema _B_**
- Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
- If the compatibility check signals that migration is required, nothing happens in this case since for
heap backends, all state is already deserialized into objects.
Expand Down Expand Up @@ -304,7 +304,7 @@ the nested element serializer.
In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.
* `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: checks the compatibility based on the outer snapshot information.

By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
read / write, and therefore have empty default implementations for the above methods. If the subclass
Expand Down Expand Up @@ -344,12 +344,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
}

@Override
protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;

@Override
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
GenericArraySerializerSnapshot<C[]> oldGenericArraySerializerSnapshot =
(GenericArraySerializerSnapshot<C[]>) oldSerializerSnapshot;
return (this.componentClass == oldGenericArraySerializerSnapshot.componentClass)
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

@Override
Expand Down Expand Up @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows:
`TypeSerializerConfigSnapshot` implementation as will as the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer).

## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19

This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.19.

Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
(maybe in Flink library) has to meet the future need.
Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer has to be modified.
There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
not supported in some scenarios.

So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated
and will be removed in the future. it is highly recommended to migrate from the old one to
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows:

1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.
2. Remove the old method `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.

{{< top >}}

0 comments on commit 909e06e

Please sign in to comment.