Skip to content

Commit

Permalink
[FLINK-6600] Add key serializer config snapshot to keyed backend chec…
Browse files Browse the repository at this point in the history
…kpoints

This commit adds the config snapshot of the key serializer of keyed
backends to its checkpoints. This allows the oppurtunity to upgrade key
serializers, as well as state migration in the future in the case of
incompatible old and new key serializers.

This closes apache#3925.
  • Loading branch information
tzulitai committed May 17, 2017
1 parent 2bfead7 commit d8a467b
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1116,13 +1116,28 @@ private void restoreKeyGroupsInStateHandle()
* @throws ClassNotFoundException
* @throws RocksDBException
*/
@SuppressWarnings("unchecked")
private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {

KeyedBackendSerializationProxy serializationProxy =
new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);

serializationProxy.read(currentStateHandleInView);

// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
.isRequiresMigration()) {

// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}

List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();

Expand Down Expand Up @@ -1214,6 +1229,7 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBack
this.stateBackend = stateBackend;
}

@SuppressWarnings("unchecked")
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {

Expand All @@ -1228,6 +1244,20 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBack
DataInputView in = new DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);

// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) stateBackend.keySerializer)
.isRequiresMigration()) {

// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}

return serializationProxy.getStateMetaInfoSnapshots();
} finally {
if (inputStream != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
Expand All @@ -38,6 +44,8 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public static final int VERSION = 3;

private TypeSerializer<?> keySerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;

private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;

private ClassLoader userCodeClassLoader;
Expand All @@ -51,6 +59,7 @@ public KeyedBackendSerializationProxy(
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {

this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());

Preconditions.checkNotNull(stateMetaInfoSnapshots);
Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
Expand All @@ -65,6 +74,10 @@ public TypeSerializer<?> getKeySerializer() {
return keySerializer;
}

public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
return keySerializerConfigSnapshot;
}

@Override
public int getVersion() {
return VERSION;
Expand All @@ -80,10 +93,24 @@ public int[] getCompatibleVersions() {
public void write(DataOutputView out) throws IOException {
super.write(out);

new TypeSerializerSerializationProxy<>(keySerializer).write(out);
// write in a way to be fault tolerant of read failures when deserializing the key serializer
try (
ByteArrayOutputStreamWithPos buffer = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(buffer)){

out.writeShort(stateMetaInfoSnapshots.size());
new TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper);

// write offset of key serializer's configuration snapshot
out.writeInt(buffer.getPosition());
TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, keySerializerConfigSnapshot);

// flush buffer
out.writeInt(buffer.getPosition());
out.write(buffer.getBuf(), 0, buffer.getPosition());
}

// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) {
KeyedBackendStateMetaInfoSnapshotReaderWriters
.getWriterForVersion(VERSION, metaInfo)
Expand All @@ -97,8 +124,34 @@ public void read(DataInputView in) throws IOException {

final TypeSerializerSerializationProxy<?> keySerializerProxy =
new TypeSerializerSerializationProxy<>(userCodeClassLoader);
keySerializerProxy.read(in);
this.keySerializer = keySerializerProxy.getTypeSerializer();

// only starting from version 3, we have the key serializer and its config snapshot written
if (getReadVersion() >= 3) {
int keySerializerConfigSnapshotOffset = in.readInt();
int numBufferedBytes = in.readInt();
byte[] keySerializerAndConfigBytes = new byte[numBufferedBytes];
in.readFully(keySerializerAndConfigBytes);

try (
ByteArrayInputStreamWithPos buffer = new ByteArrayInputStreamWithPos(keySerializerAndConfigBytes);
DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(buffer)) {

try {
keySerializerProxy.read(bufferWrapper);
this.keySerializer = keySerializerProxy.getTypeSerializer();
} catch (IOException e) {
this.keySerializer = null;
}

buffer.setPosition(keySerializerConfigSnapshotOffset);
this.keySerializerConfigSnapshot =
TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
}
} else {
keySerializerProxy.read(in);
this.keySerializer = keySerializerProxy.getTypeSerializer();
this.keySerializerConfigSnapshot = null;
}

int numKvStates = in.readShort();
stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateMigrationUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
Expand Down Expand Up @@ -385,6 +386,20 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws

serializationProxy.read(inView);

// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) keySerializer)
.isRequiresMigration()) {

// TODO replace with state migration; note that key hash codes need to remain the same after migration
throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}

List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import static org.mockito.Mockito.mock;

@RunWith(PowerMockRunner.class)
@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
@PrepareForTest({
KeyedBackendSerializationProxy.class,
KeyedBackendStateMetaInfoSnapshotReaderWriters.class,
OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
public class SerializationProxiesTest {

@Test
Expand Down Expand Up @@ -80,9 +83,58 @@ public void testKeyedBackendSerializationProxyRoundtrip() throws Exception {
}

Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
}

@Test
public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures() throws Exception {

TypeSerializer<?> keySerializer = IntSerializer.INSTANCE;
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;

List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>();

stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());

KeyedBackendSerializationProxy serializationProxy =
new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);

byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
serializationProxy.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}

serializationProxy =
new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());

// mock failure when deserializing serializers
TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);

try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
serializationProxy.read(new DataInputViewStreamWrapper(in));
}

Assert.assertEquals(null, serializationProxy.getKeySerializer());
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());

for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
Assert.assertEquals(null, meta.getNamespaceSerializer());
Assert.assertEquals(null, meta.getStateSerializer());
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
}
}

@Test
public void testKeyedStateMetaInfoSerialization() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
Expand Down Expand Up @@ -1804,6 +1805,47 @@ public void testKeyGroupSnapshotRestore() throws Exception {
secondHalfBackend.dispose();
}

@Test
public void testRestoreWithWrongKeySerializer() {
try {
CheckpointStreamFactory streamFactory = createStreamFactory();

// use an IntSerializer at first
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);

ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);

// write some state
backend.setCurrentKey(1);
state.update("1");
backend.setCurrentKey(2);
state.update("2");

// draw a snapshot
KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));

backend.dispose();

// restore with the wrong key serializer
try {
restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);

fail("should recognize wrong key serializer");
} catch (RuntimeException e) {
if (!e.getMessage().contains("The new key serializer is not compatible")) {
fail("wrong exception " + e);
}
// expected
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
@SuppressWarnings("unchecked")
public void testValueStateRestoreWithWrongSerializers() {
Expand Down

0 comments on commit d8a467b

Please sign in to comment.