From d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 17 May 2017 01:15:57 +0800 Subject: [PATCH] [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints This commit adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the oppurtunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers. This closes #3925. --- .../state/RocksDBKeyedStateBackend.java | 30 +++++++++ .../state/KeyedBackendSerializationProxy.java | 61 +++++++++++++++++-- .../state/heap/HeapKeyedStateBackend.java | 15 +++++ .../state/SerializationProxiesTest.java | 54 +++++++++++++++- .../runtime/state/StateBackendTestBase.java | 42 +++++++++++++ 5 files changed, 197 insertions(+), 5 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4bd94fd21b4e5..ddc7e17a90418 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1116,6 +1116,7 @@ private void restoreKeyGroupsInStateHandle() * @throws ClassNotFoundException * @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = @@ -1123,6 +1124,20 @@ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); @@ -1214,6 +1229,7 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBack this.stateBackend = stateBackend; } + @SuppressWarnings("unchecked") private List> readMetaData( StreamStateHandle metaStateHandle) throws Exception { @@ -1228,6 +1244,20 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBack DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) stateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + return serializationProxy.getStateMetaInfoSnapshots(); } finally { if (inputStream != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index a20628c517fec..94fb9f12893f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -19,10 +19,16 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -38,6 +44,8 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public static final int VERSION = 3; private TypeSerializer keySerializer; + private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; + private List> stateMetaInfoSnapshots; private ClassLoader userCodeClassLoader; @@ -51,6 +59,7 @@ public KeyedBackendSerializationProxy( List> stateMetaInfoSnapshots) { this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration()); Preconditions.checkNotNull(stateMetaInfoSnapshots); Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE); @@ -65,6 +74,10 @@ public TypeSerializer getKeySerializer() { return keySerializer; } + public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() { + return keySerializerConfigSnapshot; + } + @Override public int getVersion() { return VERSION; @@ -80,10 +93,24 @@ public int[] getCompatibleVersions() { public void write(DataOutputView out) throws IOException { super.write(out); - new TypeSerializerSerializationProxy<>(keySerializer).write(out); + // write in a way to be fault tolerant of read failures when deserializing the key serializer + try ( + ByteArrayOutputStreamWithPos buffer = new ByteArrayOutputStreamWithPos(); + DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(buffer)){ - out.writeShort(stateMetaInfoSnapshots.size()); + new TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper); + + // write offset of key serializer's configuration snapshot + out.writeInt(buffer.getPosition()); + TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, keySerializerConfigSnapshot); + // flush buffer + out.writeInt(buffer.getPosition()); + out.write(buffer.getBuf(), 0, buffer.getPosition()); + } + + // write individual registered keyed state metainfos + out.writeShort(stateMetaInfoSnapshots.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot metaInfo : stateMetaInfoSnapshots) { KeyedBackendStateMetaInfoSnapshotReaderWriters .getWriterForVersion(VERSION, metaInfo) @@ -97,8 +124,34 @@ public void read(DataInputView in) throws IOException { final TypeSerializerSerializationProxy keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader); - keySerializerProxy.read(in); - this.keySerializer = keySerializerProxy.getTypeSerializer(); + + // only starting from version 3, we have the key serializer and its config snapshot written + if (getReadVersion() >= 3) { + int keySerializerConfigSnapshotOffset = in.readInt(); + int numBufferedBytes = in.readInt(); + byte[] keySerializerAndConfigBytes = new byte[numBufferedBytes]; + in.readFully(keySerializerAndConfigBytes); + + try ( + ByteArrayInputStreamWithPos buffer = new ByteArrayInputStreamWithPos(keySerializerAndConfigBytes); + DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(buffer)) { + + try { + keySerializerProxy.read(bufferWrapper); + this.keySerializer = keySerializerProxy.getTypeSerializer(); + } catch (IOException e) { + this.keySerializer = null; + } + + buffer.setPosition(keySerializerConfigSnapshotOffset); + this.keySerializerConfigSnapshot = + TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader); + } + } else { + keySerializerProxy.read(in); + this.keySerializer = keySerializerProxy.getTypeSerializer(); + this.keySerializerConfigSnapshot = null; + } int numKvStates = in.readShort(); stateMetaInfoSnapshots = new ArrayList<>(numKvStates); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 11e7760fa3d56..8d3d8a0784c8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -385,6 +386,20 @@ private void restorePartitionedState(Collection state) throws serializationProxy.read(inView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 02b4d6216d909..8bbbd5f02f30b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -44,7 +44,10 @@ import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) -@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) +@PrepareForTest({ + KeyedBackendSerializationProxy.class, + KeyedBackendStateMetaInfoSnapshotReaderWriters.class, + OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) public class SerializationProxiesTest { @Test @@ -80,9 +83,58 @@ public void testKeyedBackendSerializationProxyRoundtrip() throws Exception { } Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer()); + Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots()); } + @Test + public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures() throws Exception { + + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = DoubleSerializer.INSTANCE; + + List> stateMetaInfoList = new ArrayList<>(); + + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + serializationProxy.write(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + serializationProxy = + new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader()); + + // mock failure when deserializing serializers + TypeSerializerSerializationProxy mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + serializationProxy.read(new DataInputViewStreamWrapper(in)); + } + + Assert.assertEquals(null, serializationProxy.getKeySerializer()); + Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot meta : serializationProxy.getStateMetaInfoSnapshots()) { + Assert.assertEquals(null, meta.getNamespaceSerializer()); + Assert.assertEquals(null, meta.getStateSerializer()); + Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot()); + Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot()); + } + } + @Test public void testKeyedStateMetaInfoSerialization() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index ca66ffb944ba7..b1927f18ba393 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -38,6 +38,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -1804,6 +1805,47 @@ public void testKeyGroupSnapshotRestore() throws Exception { secondHalfBackend.dispose(); } + @Test + public void testRestoreWithWrongKeySerializer() { + try { + CheckpointStreamFactory streamFactory = createStreamFactory(); + + // use an IntSerializer at first + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", String.class); + + ValueState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // write some state + backend.setCurrentKey(1); + state.update("1"); + backend.setCurrentKey(2); + state.update("2"); + + // draw a snapshot + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + // restore with the wrong key serializer + try { + restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1); + + fail("should recognize wrong key serializer"); + } catch (RuntimeException e) { + if (!e.getMessage().contains("The new key serializer is not compatible")) { + fail("wrong exception " + e); + } + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + @Test @SuppressWarnings("unchecked") public void testValueStateRestoreWithWrongSerializers() {