Skip to content

Commit

Permalink
[FLINK-30613][serializer] Migrate GenericArraySerializerSnapshot to i…
Browse files Browse the repository at this point in the history
…mplement new method of resolving schema compatibility
  • Loading branch information
masteryhx committed Jan 15, 2024
1 parent 909e06e commit a3e0650
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -108,19 +105,13 @@ public GenericArraySerializer<C> restoreSerializer() {
componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
}

@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
TypeSerializer<C[]> newSerializer) {
checkState(nestedSnapshot != null);

if (!(newSerializer instanceof GenericArraySerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
@Nullable
public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots();
}

// delegate to the new snapshot class
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new GenericArraySerializerSnapshot<>(componentClass),
nestedSnapshot.getNestedSerializerSnapshots());
@Nullable
public Class<C> getComponentClass() {
return componentClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;

import java.io.IOException;
import java.util.Objects;

/**
* Point-in-time configuration of a {@link GenericArraySerializer}.
Expand Down Expand Up @@ -76,10 +80,35 @@ protected void readOuterSnapshot(
this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
}

@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) {
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
oldSerializerSnapshot,
this,
Objects.requireNonNull(
((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot)
.getNestedSerializerSnapshots()));
}
return super.resolveSchemaCompatibility(oldSerializerSnapshot);
}

@Override
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
GenericArraySerializer<C> newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
Class<C> componentClass;
if (oldSerializerSnapshot instanceof GenericArraySerializerSnapshot) {
componentClass =
((GenericArraySerializerSnapshot<C>) oldSerializerSnapshot).componentClass;
} else if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) {
componentClass =
((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot)
.getComponentClass();
} else {
return OuterSchemaCompatibility.INCOMPATIBLE;
}
return (this.componentClass == componentClass)
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.types.Either;

import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collection;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.hamcrest.Matchers.is;

/** A {@link TypeSerializerUpgradeTestBase} for {@link GenericArraySerializer}. */
Expand Down Expand Up @@ -143,4 +146,17 @@ public Matcher<TypeSerializerSchemaCompatibility<String[]>> schemaCompatibilityM
return TypeSerializerMatchers.isCompatibleAsIs();
}
}

@Test
public void testUpgradeFromDeprecatedSnapshot() {
GenericArraySerializer<String> genericArraySerializer =
new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE);
GenericArraySerializerConfigSnapshot<String> oldSnapshot =
new GenericArraySerializerConfigSnapshot<>(genericArraySerializer);
TypeSerializerSchemaCompatibility<String[]> schemaCompatibility =
genericArraySerializer
.snapshotConfiguration()
.resolveSchemaCompatibility(oldSnapshot);
assertThat(schemaCompatibility.isCompatibleAsIs()).isTrue();
}
}

0 comments on commit a3e0650

Please sign in to comment.