From 3ac282322a3940da9d69f81b9bc189b6d0c0463f Mon Sep 17 00:00:00 2001 From: kkloudas Date: Tue, 24 Apr 2018 14:48:34 +0200 Subject: [PATCH] [FLINK-8841] Remove HashMapSerializer and use MapSerializer instead. This closes #5910. --- .../apache/flink/util/InstantiationUtil.java | 59 ++++- .../KVStateRequestSerializerRocksDBTest.java | 5 +- .../network/KvStateRequestSerializerTest.java | 8 +- .../state/AbstractKeyedStateBackend.java | 3 +- .../runtime/state/HashMapSerializer.java | 245 ------------------ .../state/heap/HeapKeyedStateBackend.java | 12 +- .../runtime/state/heap/HeapMapState.java | 34 +-- .../state/internal/InternalMapState.java | 2 +- ...eyedStateBackendSnapshotMigrationTest.java | 164 ++++++++++++ .../heap_keyed_statebackend_1_5_map.snapshot | Bin 0 -> 3613 bytes .../state/RocksDBKeyedStateBackend.java | 2 +- .../streaming/state/RocksDBMapState.java | 2 +- 12 files changed, 249 insertions(+), 287 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java create mode 100644 flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 978d270e05f97..3db0236fc9a90 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; @@ -41,8 +42,10 @@ import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; /** @@ -194,10 +197,12 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo try { Class.forName(streamClassDescriptor.getName(), false, classLoader); } catch (ClassNotFoundException e) { - if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) { - ObjectStreamClass result = ObjectStreamClass.lookup( - KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class); - return result; + + final ObjectStreamClass equivalentSerializer = + MigrationUtil.getEquivalentSerializer(streamClassDescriptor.getName()); + + if (equivalentSerializer != null) { + return equivalentSerializer; } } @@ -221,6 +226,52 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo } } + /** + * A mapping between the full path of a deprecated serializer and its equivalent. + * These mappings are hardcoded and fixed. + * + *

IMPORTANT: mappings can be removed after 1 release as there will be a "migration path". + * As an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for 1.5, + * and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> Flink-{>= 1.6}. + */ + private enum MigrationUtil { + + // To add a new mapping just pick a name and add an entry as the following: + + GENERIC_DATA_ARRAY_SERIALIZER( + "org.apache.avro.generic.GenericData$Array", + ObjectStreamClass.lookup(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class)), + HASH_MAP_SERIALIZER( + "org.apache.flink.runtime.state.HashMapSerializer", + ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5 + + /** An internal unmodifiable map containing the mappings between deprecated and new serializers. */ + private static final Map EQUIVALENCE_MAP = Collections.unmodifiableMap(initMap()); + + /** The full name of the class of the old serializer. */ + private final String oldSerializerName; + + /** The serialization descriptor of the class of the new serializer. */ + private final ObjectStreamClass newSerializerStreamClass; + + MigrationUtil(String oldSerializerName, ObjectStreamClass newSerializerStreamClass) { + this.oldSerializerName = oldSerializerName; + this.newSerializerStreamClass = newSerializerStreamClass; + } + + private static Map initMap() { + final Map init = new HashMap<>(4); + for (MigrationUtil m: MigrationUtil.values()) { + init.put(m.oldSerializerName, m.newSerializerStreamClass); + } + return init; + } + + private static ObjectStreamClass getEquivalentSerializer(String classDescriptorName) { + return EQUIVALENCE_MAP.get(classDescriptorName); + } + } + /** * Creates a new instance of the given class. * diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index 4985bf35293fd..6ee7631aa4524 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -41,7 +41,6 @@ import org.rocksdb.DBOptions; import java.io.File; -import java.util.Map; import static org.mockito.Mockito.mock; @@ -160,8 +159,8 @@ public void testMapSerialization() throws Exception { longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalMapState> mapState = - (InternalMapState>) + final InternalMapState mapState = + (InternalMapState) longHeapKeyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index dac1b909f4c8c..1dc7186d9ba93 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -307,8 +307,8 @@ public void testMapSerialization() throws Exception { ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalMapState> mapState = - (InternalMapState>) + final InternalMapState mapState = + (InternalMapState) longHeapKeyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, @@ -328,9 +328,9 @@ public void testMapSerialization() throws Exception { * * @throws Exception */ - public static > void testMapSerialization( + public static void testMapSerialization( final long key, - final InternalMapState mapState) throws Exception { + final InternalMapState mapState) throws Exception { TypeSerializer userKeySerializer = LongSerializer.INSTANCE; TypeSerializer userValueSerializer = StringSerializer.INSTANCE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 287474c05fb43..f873655ceeb6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.Map; import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -232,7 +231,7 @@ protected abstract InternalFoldingState createFoldingS * @param Type of the keys in the state * @param Type of the values in the state * */ - protected abstract InternalMapState> createMapState( + protected abstract InternalMapState createMapState( TypeSerializer namespaceSerializer, MapStateDescriptor stateDesc) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java deleted file mode 100644 index c1b6346d5f153..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A serializer for {@link HashMap}. The serializer relies on a key serializer and a value serializer - * for the serialization of the map's key-value pairs. - * - *

The serialization format for the map is as follows: four bytes for the length of the map, - * followed by the serialized representation of each key-value pair. To allow null values, each value - * is prefixed by a null marker. - * - * @param The type of the keys in the map. - * @param The type of the values in the map. - */ -@Internal -public final class HashMapSerializer extends TypeSerializer> { - - private static final long serialVersionUID = -6885593032367050078L; - - /** The serializer for the keys in the map */ - private final TypeSerializer keySerializer; - - /** The serializer for the values in the map */ - private final TypeSerializer valueSerializer; - - /** - * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map. - * - * @param keySerializer The serializer for the keys in the map - * @param valueSerializer The serializer for the values in the map - */ - public HashMapSerializer(TypeSerializer keySerializer, TypeSerializer valueSerializer) { - this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null"); - this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null."); - } - - // ------------------------------------------------------------------------ - // HashMapSerializer specific properties - // ------------------------------------------------------------------------ - - public TypeSerializer getKeySerializer() { - return keySerializer; - } - - public TypeSerializer getValueSerializer() { - return valueSerializer; - } - - // ------------------------------------------------------------------------ - // Type Serializer implementation - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public TypeSerializer> duplicate() { - TypeSerializer duplicateKeySerializer = keySerializer.duplicate(); - TypeSerializer duplicateValueSerializer = valueSerializer.duplicate(); - - return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer) - ? this - : new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer); - } - - @Override - public HashMap createInstance() { - return new HashMap<>(); - } - - @Override - public HashMap copy(HashMap from) { - HashMap newHashMap = new HashMap<>(from.size()); - - for (Map.Entry entry : from.entrySet()) { - K newKey = keySerializer.copy(entry.getKey()); - V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue()); - - newHashMap.put(newKey, newValue); - } - - return newHashMap; - } - - @Override - public HashMap copy(HashMap from, HashMap reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; // var length - } - - @Override - public void serialize(HashMap map, DataOutputView target) throws IOException { - final int size = map.size(); - target.writeInt(size); - - for (Map.Entry entry : map.entrySet()) { - keySerializer.serialize(entry.getKey(), target); - - if (entry.getValue() == null) { - target.writeBoolean(true); - } else { - target.writeBoolean(false); - valueSerializer.serialize(entry.getValue(), target); - } - } - } - - @Override - public HashMap deserialize(DataInputView source) throws IOException { - final int size = source.readInt(); - - final HashMap map = new HashMap<>(size); - for (int i = 0; i < size; ++i) { - K key = keySerializer.deserialize(source); - - boolean isNull = source.readBoolean(); - V value = isNull ? null : valueSerializer.deserialize(source); - - map.put(key, value); - } - - return map; - } - - @Override - public HashMap deserialize(HashMap reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - final int size = source.readInt(); - target.writeInt(size); - - for (int i = 0; i < size; ++i) { - keySerializer.copy(source, target); - - boolean isNull = source.readBoolean(); - target.writeBoolean(isNull); - - if (!isNull) { - valueSerializer.copy(source, target); - } - } - } - - @Override - public boolean equals(Object obj) { - return obj == this || - (obj != null && obj.getClass() == getClass() && - keySerializer.equals(((HashMapSerializer) obj).getKeySerializer()) && - valueSerializer.equals(((HashMapSerializer) obj).getValueSerializer())); - } - - @Override - public boolean canEqual(Object obj) { - return (obj != null && obj.getClass() == getClass()); - } - - @Override - public int hashCode() { - return keySerializer.hashCode() * 31 + valueSerializer.hashCode(); - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof MapSerializerConfigSnapshot) { - List, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = - ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult keyCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(0).f1, - keySerializer); - - CompatibilityResult valueCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(1).f1, - valueSerializer); - - if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new HashMapSerializer<>( - new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 82f883c3dea3f..10803e26bc125 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DoneFuture; -import org.apache.flink.runtime.state.HashMapSerializer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; @@ -108,7 +107,7 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { * but we can't put them here because different key/value states with different types and * namespace types share this central list of tables. */ - private final HashMap> stateTables = new HashMap<>(); + private final Map> stateTables = new HashMap<>(); /** * Map of state names to their corresponding restored state meta info. @@ -291,16 +290,11 @@ public InternalFoldingState createFoldingState( } @Override - protected InternalMapState> createMapState( + protected InternalMapState createMapState( TypeSerializer namespaceSerializer, MapStateDescriptor stateDesc) throws Exception { - StateTable> stateTable = tryRegisterStateTable( - stateDesc.getName(), - stateDesc.getType(), - namespaceSerializer, - new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer())); - + StateTable> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index 7c1807101d93c..ccd017f81c1ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -21,9 +21,9 @@ import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; -import org.apache.flink.runtime.state.HashMapSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; @@ -40,8 +40,8 @@ * @param The type of the values in the state. */ public class HeapMapState - extends AbstractHeapState, MapState, MapStateDescriptor> - implements InternalMapState> { + extends AbstractHeapState, MapState, MapStateDescriptor> + implements InternalMapState { /** * Creates a new key/value state for the given hash map of key/value pairs. @@ -52,7 +52,7 @@ public class HeapMapState */ public HeapMapState( MapStateDescriptor stateDesc, - StateTable> stateTable, + StateTable> stateTable, TypeSerializer keySerializer, TypeSerializer namespaceSerializer) { super(stateDesc, stateTable, keySerializer, namespaceSerializer); @@ -69,8 +69,8 @@ public TypeSerializer getNamespaceSerializer() { } @Override - public TypeSerializer> getValueSerializer() { - return new HashMapSerializer<>( + public TypeSerializer> getValueSerializer() { + return new MapSerializer<>( stateDesc.getKeySerializer(), stateDesc.getValueSerializer() ); @@ -79,7 +79,7 @@ public TypeSerializer> getValueSerializer() { @Override public UV get(UK userKey) { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); if (userMap == null) { return null; @@ -91,7 +91,7 @@ public UV get(UK userKey) { @Override public void put(UK userKey, UV userValue) { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); if (userMap == null) { userMap = new HashMap<>(); stateTable.put(currentNamespace, userMap); @@ -103,7 +103,7 @@ public void put(UK userKey, UV userValue) { @Override public void putAll(Map value) { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); if (userMap == null) { userMap = new HashMap<>(); @@ -116,7 +116,7 @@ public void putAll(Map value) { @Override public void remove(UK userKey) { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); if (userMap == null) { return; } @@ -130,31 +130,31 @@ public void remove(UK userKey) { @Override public boolean contains(UK userKey) { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); return userMap != null && userMap.containsKey(userKey); } @Override public Iterable> entries() { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); return userMap == null ? null : userMap.entrySet(); } @Override public Iterable keys() { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); return userMap == null ? null : userMap.keySet(); } @Override public Iterable values() { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); return userMap == null ? null : userMap.values(); } @Override public Iterator> iterator() { - HashMap userMap = stateTable.get(currentNamespace); + Map userMap = stateTable.get(currentNamespace); return userMap == null ? null : userMap.entrySet().iterator(); } @@ -163,7 +163,7 @@ public byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer safeKeySerializer, final TypeSerializer safeNamespaceSerializer, - final TypeSerializer> safeValueSerializer) throws Exception { + final TypeSerializer> safeValueSerializer) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace); Preconditions.checkNotNull(safeKeySerializer); @@ -179,7 +179,7 @@ public byte[] getSerializedValue( return null; } - final HashMapSerializer serializer = (HashMapSerializer) safeValueSerializer; + final MapSerializer serializer = (MapSerializer) safeValueSerializer; final TypeSerializer dupUserKeySerializer = serializer.getKeySerializer(); final TypeSerializer dupUserValueSerializer = serializer.getValueSerializer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java index 91f698c178158..f9509e1489d30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java @@ -32,4 +32,4 @@ * @param Type of the values folded into the state * @param Type of the value in the state */ -public interface InternalMapState> extends InternalKvState, MapState {} +public interface InternalMapState extends InternalKvState>, MapState {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index 815ceaee2fe54..345cd4f302a02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -20,10 +20,16 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -33,6 +39,9 @@ import java.io.FileInputStream; import java.net.URL; import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.RunnableFuture; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; @@ -42,6 +51,161 @@ */ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase { + @Test + public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception { + ClassLoader cl = getClass().getClassLoader(); + URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot"); + + Preconditions.checkNotNull(resource, "Binary snapshot resource not found!"); + + try (final HeapKeyedStateBackend keyedBackend = createKeyedBackend()) { + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + final SnapshotResult stateHandles; + try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) { + stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader()); + } + + final MapStateDescriptor stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot())); + + InternalMapState state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr); + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + assertEquals(33L, (long) state.get(33L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(2, getStateSize(state)); + + state.setCurrentNamespace(namespace2); + assertEquals(22L, (long) state.get(22L)); + assertEquals(11L, (long) state.get(11L)); + assertEquals(2, getStateSize(state)); + + state.setCurrentNamespace(namespace3); + assertEquals(44L, (long) state.get(44L)); + assertEquals(1, getStateSize(state)); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + assertEquals(11L, (long) state.get(11L)); + assertEquals(44L, (long) state.get(44L)); + assertEquals(2, getStateSize(state)); + + state.setCurrentNamespace(namespace3); + assertEquals(22L, (long) state.get(22L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(33L, (long) state.get(33L)); + assertEquals(3, getStateSize(state)); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + assertEquals(11L, (long) state.get(11L)); + assertEquals(22L, (long) state.get(22L)); + assertEquals(33L, (long) state.get(33L)); + assertEquals(44L, (long) state.get(44L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(5, getStateSize(state)); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + assertEquals(11L, (long) state.get(11L)); + assertEquals(22L, (long) state.get(22L)); + assertEquals(33L, (long) state.get(33L)); + assertEquals(44L, (long) state.get(44L)); + assertEquals(55L, (long) state.get(55L)); + assertEquals(5, getStateSize(state)); + + RunnableFuture> snapshot = keyedBackend.snapshot( + 1L, + 1L, + new MemCheckpointStreamFactory(4 * 1024 * 1024), + CheckpointOptions.forCheckpointWithDefaultLocation()); + + snapshot.run(); + } + } + + private int getStateSize(InternalMapState mapState) throws Exception { + int i = 0; + for (Iterator> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {} + return i; + } + +// /** +// * This code was used to create the binary file of the old version's (< 1.5) snapshot used by this test. +// * If you need to recreate the binary, you can comment this out and run it. +// */ +// private void createBinarySnapshotWithMap() throws Exception { +// +// final String pathToWrite = "/PATH/TO/WRITE"; +// +// final MapStateDescriptor stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class); +// stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); +// +// final Integer namespace1 = 1; +// final Integer namespace2 = 2; +// final Integer namespace3 = 3; +// +// try (final HeapKeyedStateBackend keyedBackend = createKeyedBackend()) { +// InternalMapState state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr); +// +// keyedBackend.setCurrentKey("abc"); +// state.setCurrentNamespace(namespace1); +// state.put(33L, 33L); +// state.put(55L, 55L); +// +// state.setCurrentNamespace(namespace2); +// state.put(22L, 22L); +// state.put(11L, 11L); +// +// state.setCurrentNamespace(namespace3); +// state.put(44L, 44L); +// +// keyedBackend.setCurrentKey("def"); +// state.setCurrentNamespace(namespace1); +// state.put(11L, 11L); +// state.put(44L, 44L); +// +// state.setCurrentNamespace(namespace3); +// state.put(22L, 22L); +// state.put(55L, 55L); +// state.put(33L, 33L); +// +// keyedBackend.setCurrentKey("jkl"); +// state.setCurrentNamespace(namespace1); +// state.put(11L, 11L); +// state.put(22L, 22L); +// state.put(33L, 33L); +// state.put(44L, 44L); +// state.put(55L, 55L); +// +// keyedBackend.setCurrentKey("mno"); +// state.setCurrentNamespace(namespace3); +// state.put(11L, 11L); +// state.put(22L, 22L); +// state.put(33L, 33L); +// state.put(44L, 44L); +// state.put(55L, 55L); +// +// RunnableFuture> snapshot = keyedBackend.snapshot( +// 0L, +// 0L, +// new MemCheckpointStreamFactory(4 * 1024 * 1024), +// CheckpointOptions.forCheckpointWithDefaultLocation()); +// +// snapshot.run(); +// +// try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) { +// InstantiationUtil.serializeObject(bis, snapshot.get()); +// } +// } +// } + /** * [FLINK-5979] * diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot new file mode 100644 index 0000000000000000000000000000000000000000..32c301fd0c75e142a95c96e5de824cc95efe5a38 GIT binary patch literal 3613 zcmeHJJ#14+5FR@wn*$WD0ubWFa0SF6FQ-HBXj^?O+LF;j@xg#5#}(nOs>N z_}JNX@}*8l&lx0^5aeQtA-qTRyncSOqFT0*F$qdm&Uankvv?(-WnOYZD@#IbU+FJR zS&2BM^OR{=Y+zVsFxe3Mw~f<= ziNn6eV-?%OR*hy?8_kYZ9$ph$mWWfXYZi5zF=)R#@Vpvti1FJ!oASM4EutSLny7aWv7jMC<+Q~41Nz+pF` l InternalFoldingState createFoldingState( } @Override - protected InternalMapState> createMapState( + protected InternalMapState createMapState( TypeSerializer namespaceSerializer, MapStateDescriptor stateDesc) throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index baa90fac5d09c..fbc55dce33bae 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -60,7 +60,7 @@ */ public class RocksDBMapState extends AbstractRocksDBState, MapState, MapStateDescriptor> - implements InternalMapState> { + implements InternalMapState { private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);