Skip to content

Commit

Permalink
[FLINK-11947] Improve readability and fix raw type usage for RocksDBK…
Browse files Browse the repository at this point in the history
…eyedStateBackend#migrateStateValues

This closes apache#8565.
  • Loading branch information
tzulitai committed Jun 20, 2019
1 parent b1a2e50 commit 829146d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;

import java.util.Map;

Expand Down Expand Up @@ -64,4 +65,9 @@ protected MapSerializer<K, V> createOuterSerializerWithNestedSerializers(TypeSer
protected TypeSerializer<?>[] getNestedSerializers(MapSerializer<K, V> outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
}

@SuppressWarnings("unchecked")
public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
return (TypeSerializerSnapshot<K>) getNestedSerializerSnapshots()[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ public TypeSerializer<S> getPreviousStateSerializer() {
return stateSerializerProvider.previousSchemaSerializer();
}

@Nonnull
public StateSerializerProvider<S> getStateSerializerProvider() {
return stateSerializerProvider;
@Nullable
public TypeSerializerSnapshot<S> getPreviousStateSerializerSnapshot() {
return stateSerializerProvider.previousSerializerSnapshot;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,18 @@ private <N, S extends State, SV> void migrateStateValues(
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception {

if (stateDesc.getType() == StateDescriptor.Type.MAP) {
TypeSerializerSnapshot previousSerializerSnapshot = stateMetaInfo.f1.getStateSerializerProvider().getPreviousSerializerSnapshot();
checkState(previousSerializerSnapshot instanceof MapSerializerSnapshot, "previous serializer have to be map serializer.");
TypeSerializerSnapshot previousKeySerializerSnapshot = ((MapSerializerSnapshot) previousSerializerSnapshot).getNestedSerializersSnapshotDelegate().getNestedSerializerSnapshots()[0];
TypeSerializer newUserKeySerializer = ((MapSerializer) stateMetaInfo.f1.getStateSerializer()).getKeySerializer();
TypeSerializerSchemaCompatibility compatibility = previousKeySerializerSnapshot.resolveSchemaCompatibility(newUserKeySerializer);
if (!compatibility.isCompatibleAsIs()) {
TypeSerializerSnapshot<SV> previousSerializerSnapshot = stateMetaInfo.f1.getPreviousStateSerializerSnapshot();
checkState(previousSerializerSnapshot != null, "the previous serializer snapshot should exist.");
checkState(previousSerializerSnapshot instanceof MapSerializerSnapshot, "previous serializer snapshot should be a MapSerializerSnapshot.");

TypeSerializer<SV> newSerializer = stateMetaInfo.f1.getStateSerializer();
checkState(newSerializer instanceof MapSerializer, "new serializer should be a MapSerializer.");

MapSerializer<?, ?> mapSerializer = (MapSerializer<?, ?>) newSerializer;
MapSerializerSnapshot<?, ?> mapSerializerSnapshot = (MapSerializerSnapshot<?, ?>) previousSerializerSnapshot;
if (!checkMapStateKeySchemaCompatibility(mapSerializerSnapshot, mapSerializer)) {
throw new StateMigrationException(
"The new serializer for a MapState requires state migration in order for the job to proceed." + " However, migration for MapState currently only supported value migration.");
"The new serializer for a MapState requires state migration in order for the job to proceed, since the key schema has changed. However, migration for MapState currently only allows value schema evolutions.");
}
}

Expand Down Expand Up @@ -613,6 +617,17 @@ private <N, S extends State, SV> void migrateStateValues(
}
}

@SuppressWarnings("unchecked")
private static <UK> boolean checkMapStateKeySchemaCompatibility(
MapSerializerSnapshot<?, ?> mapStateSerializerSnapshot,
MapSerializer<?, ?> newMapStateSerializer) {
TypeSerializerSnapshot<UK> previousKeySerializerSnapshot = (TypeSerializerSnapshot<UK>) mapStateSerializerSnapshot.getKeySerializerSnapshot();
TypeSerializer<UK> newUserKeySerializer = (TypeSerializer<UK>) newMapStateSerializer.getKeySerializer();

TypeSerializerSchemaCompatibility<UK> keyCompatibility = previousKeySerializerSnapshot.resolveSchemaCompatibility(newUserKeySerializer);
return keyCompatibility.isCompatibleAsIs();
}

@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
Expand Down

0 comments on commit 829146d

Please sign in to comment.