Skip to content

Commit

Permalink
[FLINK-26853][state] Update state serializer in StateMap when metaInf…
Browse files Browse the repository at this point in the history
…o changed
  • Loading branch information
masteryhx authored and rkhachatryan committed Jul 26, 2022
1 parent 6dd75e1 commit 8df5053
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;

/** The serializer of the state. */
protected final TypeSerializer<S> stateSerializer;
protected TypeSerializer<S> stateSerializer;

/**
* An empty map shared by all zero-capacity maps (typically from default constructor). It is
Expand Down Expand Up @@ -794,6 +794,10 @@ public TypeSerializer<S> getStateSerializer() {
return stateSerializer;
}

public void setStateSerializer(TypeSerializer<S> stateSerializer) {
this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
}

// StateMapEntry
// -------------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ protected CopyOnWriteStateMap<K, N, S> createStateMap() {
return new CopyOnWriteStateMap<>(getStateSerializer());
}

@Override
public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
super.setMetaInfo(metaInfo);
for (StateMap<K, N, S> keyGroupedStateMap : keyGroupedStateMaps) {
((CopyOnWriteStateMap<K, N, S>) keyGroupedStateMap)
.setStateSerializer(metaInfo.getStateSerializer());
}
}

// Snapshotting
// ----------------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.Preconditions;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -36,6 +39,39 @@
/** Test for {@link CopyOnWriteStateTable}. */
public class CopyOnWriteStateTableTest {

/**
* This tests that Whether serializers are consistent between {@link StateTable} and {@link
* StateMap}.
*/
@Test
public void testSerializerAfterMetaInfoChanged() {
RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> originalMetaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE,
"test",
IntSerializer.INSTANCE,
new TestType.V1TestTypeSerializer());
InternalKeyContext<Integer> mockKeyContext =
new InternalKeyContextImpl<>(KeyGroupRange.of(0, 9), 10);
CopyOnWriteStateTable<Integer, Integer, TestType> table =
new CopyOnWriteStateTable<>(
mockKeyContext, originalMetaInfo, IntSerializer.INSTANCE);

RegisteredKeyValueStateBackendMetaInfo<Integer, TestType> newMetaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE,
"test",
IntSerializer.INSTANCE,
new TestType.V2TestTypeSerializer());
table.setMetaInfo(newMetaInfo);
Preconditions.checkState(table.getState().length > 0);
for (StateMap<?, ?, ?> stateEntries : table.getState()) {
Assert.assertEquals(
table.getStateSerializer(),
((CopyOnWriteStateMap<?, ?, ?>) stateEntries).getStateSerializer());
}
}

/**
* This tests that serializers used for snapshots are duplicates of the ones used in processing
* to avoid race conditions in stateful serializers.
Expand Down

0 comments on commit 8df5053

Please sign in to comment.