Skip to content

Commit

Permalink
[FLINK-5041] Savepoint Backwards Compatibility 1.1 -> 1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter authored and aljoscha committed Dec 14, 2016
1 parent 8cda6a2 commit e95fe56
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;

import java.io.ByteArrayInputStream;
import java.io.IOException;

/**
Expand Down Expand Up @@ -85,7 +85,7 @@ public ACC get() {
if (valueBytes == null) {
return null;
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
} catch (IOException|RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
Expand All @@ -103,7 +103,7 @@ public void add(T value) throws IOException {
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
Expand Down
31 changes: 31 additions & 0 deletions flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.io;

/**
* This interface is implemented by classes that provide a version number. Versions numbers can be used to differentiate
* between evolving classes.
*/
public interface Versioned {

/**
* Returns the version number of the object. Versions numbers can be used to differentiate evolving classes.
*/
int getVersion();
}
25 changes: 25 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/Migration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* Tagging interface for migration related classes.
*/
public interface Migration {
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
import org.apache.flink.migration.runtime.checkpoint.TaskState;
import org.apache.flink.migration.runtime.state.AbstractStateBackend;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.StateHandle;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
Expand Down Expand Up @@ -266,10 +267,7 @@ private StreamStateHandle convertOperatorAndFunctionState(StreamTaskState stream
}

if (null != operatorState) {
mergeStateHandles.add(SIGNAL_1);
mergeStateHandles.add(convertStateHandle(operatorState));
} else {
mergeStateHandles.add(SIGNAL_0);
}

return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
Expand Down Expand Up @@ -340,6 +338,9 @@ private static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandl
byte[] data =
((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
} else if (oldStateHandle instanceof AbstractStateBackend.DataInputViewHandle) {
return convertStateHandle(
((AbstractStateBackend.DataInputViewHandle) oldStateHandle).getStreamStateHandle());
}
throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class AbstractStateBackend implements Serializable {
/**
* Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
*/
private static final class DataInputViewHandle implements StateHandle<DataInputView> {
public static final class DataInputViewHandle implements StateHandle<DataInputView> {

private static final long serialVersionUID = 2891559813513532079L;

Expand All @@ -45,6 +45,10 @@ private DataInputViewHandle(StreamStateHandle stream) {
this.stream = stream;
}

public StreamStateHandle getStreamStateHandle() {
return stream;
}

@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Migration;

@Internal
@Deprecated
/**
* This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
* e.g. when restoring backend data from a state handle.
*/
public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle implements Migration {

private static final long serialVersionUID = -8554427169776881697L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataInputStreamWrapper;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Migration;

import java.io.IOException;

Expand All @@ -30,7 +32,7 @@
* This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
* when restoring backend data from a state handle.
*/
public class MigrationStreamStateHandle implements StreamStateHandle {
public class MigrationStreamStateHandle implements StreamStateHandle, Migration {

private static final long serialVersionUID = -2332113722532150112L;
private final StreamStateHandle delegate;
Expand All @@ -41,7 +43,7 @@ public MigrationStreamStateHandle(StreamStateHandle delegate) {

@Override
public FSDataInputStream openInputStream() throws IOException {
return delegate.openInputStream();
return new MigrationFSInputStream(delegate.openInputStream());
}

@Override
Expand All @@ -53,4 +55,11 @@ public void discardState() throws Exception {
public long getStateSize() {
return delegate.getStateSize();
}

static class MigrationFSInputStream extends FSDataInputStreamWrapper implements Migration {

public MigrationFSInputStream(FSDataInputStream inputStream) {
super(inputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.checkpoint.savepoint;

import org.apache.flink.core.io.Versioned;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TaskState;

Expand All @@ -34,17 +35,7 @@
* <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
* via a {@link SavepointStore}.
*/
public interface Savepoint {

/**
* Returns the savepoint version.
*
* <p>This version is independent of the Flink version, e.g. multiple Flink
* versions can work the same savepoint version.
*
* @return Savepoint version
*/
int getVersion();
public interface Savepoint extends Versioned {

/**
* Returns the checkpoint ID of the savepoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws
duration);
}

public static void serializeKeyGroupStateHandle(
private static void serializeKeyGroupStateHandle(
KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {

if (stateHandle != null) {
Expand All @@ -227,7 +227,7 @@ public static void serializeKeyGroupStateHandle(
}
}

public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
Expand All @@ -247,7 +247,7 @@ public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStrea
}
}

public static void serializeOperatorStateHandle(
private static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {

if (stateHandle != null) {
Expand All @@ -258,8 +258,8 @@ public static void serializeOperatorStateHandle(
dos.writeUTF(entry.getKey());
long[] offsets = entry.getValue();
dos.writeInt(offsets.length);
for (int i = 0; i < offsets.length; ++i) {
dos.writeLong(offsets[i]);
for (long offset : offsets) {
dos.writeLong(offset);
}
}
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
Expand All @@ -268,7 +268,7 @@ public static void serializeOperatorStateHandle(
}
}

public static OperatorStateHandle deserializeOperatorStateHandle(
private static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {

final int type = dis.readByte();
Expand All @@ -292,7 +292,7 @@ public static OperatorStateHandle deserializeOperatorStateHandle(
}
}

public static void serializeStreamStateHandle(
private static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {

if (stateHandle == null) {
Expand All @@ -319,7 +319,7 @@ public static void serializeStreamStateHandle(
dos.flush();
}

public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
int type = dis.read();
if (NULL_HANDLE == type) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,8 @@ public void testSavepointMigrationV0ToV1() throws Exception {

//check operator state
expTestState.f0 = 1;
if (p % 3 != 0) {
assertEquals(1, is.read());
actTestState = InstantiationUtil.deserializeObject(is, cl);
assertEquals(expTestState, actTestState);
} else {
assertEquals(0, is.read());
}
actTestState = InstantiationUtil.deserializeObject(is, cl);
assertEquals(expTestState, actTestState);
}
}

Expand Down Expand Up @@ -210,9 +205,7 @@ private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskStat
state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
}
testState = new Tuple4<>(1, i, j, k);
if (j % 3 != 0) {
state.setOperatorState(new SerializedStateHandle<>(testState));
}
state.setOperatorState(new SerializedStateHandle<>(testState));

if ((0 == k) && (i % 3 != 0)) {
HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,4 @@ public void testSnapshotRestore() throws Exception {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
Expand All @@ -58,10 +58,10 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -201,7 +201,6 @@ public final void initializeState(OperatorStateHandles stateHandles) throws Exce

if (restoring) {

// TODO check that there is EITHER old OR new state in handles!
restoreStreamCheckpointed(stateHandles);

//pass directly
Expand Down Expand Up @@ -230,18 +229,23 @@ public final void initializeState(OperatorStateHandles stateHandles) throws Exce
@Deprecated
private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
StreamStateHandle state = stateHandles.getLegacyOperatorState();
if (this instanceof StreamCheckpointedOperator && null != state) {
if (null != state) {
if (this instanceof StreamCheckpointedOperator) {

LOG.debug("Restore state of task {} in chain ({}).",
stateHandles.getOperatorChainIndex(), getContainingTask().getName());
LOG.debug("Restore state of task {} in chain ({}).",
stateHandles.getOperatorChainIndex(), getContainingTask().getName());

FSDataInputStream is = state.openInputStream();
try {
getContainingTask().getCancelables().registerClosable(is);
((StreamCheckpointedOperator) this).restoreState(is);
} finally {
getContainingTask().getCancelables().unregisterClosable(is);
is.close();
FSDataInputStream is = state.openInputStream();
try {
getContainingTask().getCancelables().registerClosable(is);
((StreamCheckpointedOperator) this).restoreState(is);
} finally {
getContainingTask().getCancelables().unregisterClosable(is);
is.close();
}
} else {
throw new Exception(
"Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
}
}
}
Expand Down
Loading

0 comments on commit e95fe56

Please sign in to comment.