Skip to content

Commit

Permalink
[FLINK-7362] [checkpoints] Savepoint property is lost after de/serial…
Browse files Browse the repository at this point in the history
…ization of CheckpointProperties
  • Loading branch information
StefanRRichter committed Aug 15, 2017
1 parent d29bed3 commit 843f0cb
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class CheckpointProperties implements Serializable {
private final boolean forced;

private final boolean externalize;
private final boolean savepoint;

private final boolean discardSubsumed;
private final boolean discardFinished;
Expand All @@ -49,6 +50,7 @@ public class CheckpointProperties implements Serializable {
CheckpointProperties(
boolean forced,
boolean externalize,
boolean savepoint,
boolean discardSubsumed,
boolean discardFinished,
boolean discardCancelled,
Expand All @@ -57,6 +59,7 @@ public class CheckpointProperties implements Serializable {

this.forced = forced;
this.externalize = externalize;
this.savepoint = savepoint;
this.discardSubsumed = discardSubsumed;
this.discardFinished = discardFinished;
this.discardCancelled = discardCancelled;
Expand Down Expand Up @@ -183,7 +186,7 @@ boolean discardOnJobSuspended() {
* @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
*/
public boolean isSavepoint() {
return this == STANDARD_SAVEPOINT;
return savepoint;
}

// ------------------------------------------------------------------------
Expand All @@ -201,6 +204,7 @@ public boolean equals(Object o) {
CheckpointProperties that = (CheckpointProperties) o;
return forced == that.forced &&
externalize == that.externalize &&
savepoint == that.savepoint &&
discardSubsumed == that.discardSubsumed &&
discardFinished == that.discardFinished &&
discardCancelled == that.discardCancelled &&
Expand All @@ -212,6 +216,7 @@ public boolean equals(Object o) {
public int hashCode() {
int result = (forced ? 1 : 0);
result = 31 * result + (externalize ? 1 : 0);
result = 31 * result + (savepoint ? 1 : 0);
result = 31 * result + (discardSubsumed ? 1 : 0);
result = 31 * result + (discardFinished ? 1 : 0);
result = 31 * result + (discardCancelled ? 1 : 0);
Expand All @@ -224,7 +229,8 @@ public int hashCode() {
public String toString() {
return "CheckpointProperties{" +
"forced=" + forced +
", externalize=" + externalizeCheckpoint() +
", externalized=" + externalizeCheckpoint() +
", savepoint=" + savepoint +
", discardSubsumed=" + discardSubsumed +
", discardFinished=" + discardFinished +
", discardCancelled=" + discardCancelled +
Expand All @@ -236,6 +242,7 @@ public String toString() {
// ------------------------------------------------------------------------

private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties(
true,
true,
true,
false,
Expand All @@ -245,6 +252,7 @@ public String toString() {
false);

private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties(
false,
false,
false,
true,
Expand All @@ -256,6 +264,7 @@ public String toString() {
private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
false,
true,
false,
true,
true,
false, // Retain on cancellation
Expand All @@ -265,6 +274,7 @@ public String toString() {
private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
false,
true,
false,
true,
true,
true, // Delete on cancellation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.util.InstantiationUtil;

import org.junit.Test;

import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -109,6 +111,12 @@ public void testIsSavepoint() throws Exception {
{
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
assertTrue(props.isSavepoint());

CheckpointProperties deserializedCheckpointProperties =
InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(props),
getClass().getClassLoader());
assertTrue(deserializedCheckpointProperties.isSavepoint());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testCleanUpOnSubsume() throws Exception {
operatorStates.put(new OperatorID(), state);

boolean discardSubsumed = true;
CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
CheckpointProperties props = new CheckpointProperties(false, false, false, discardSubsumed, true, true, true, true);

CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testCleanUpOnShutdown() throws Exception {
Mockito.reset(state);

// Keep
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
CompletedCheckpoint checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
new HashMap<>(operatorStates),
Expand All @@ -139,7 +139,7 @@ public void testCleanUpOnShutdown() throws Exception {
assertEquals(true, file.exists());

// Discard
props = new CheckpointProperties(false, false, true, true, true, true, true);
props = new CheckpointProperties(false, false, false, true, true, true, true, true);
checkpoint = new CompletedCheckpoint(
new JobID(), 0, 0, 1,
new HashMap<>(operatorStates),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class PendingCheckpointTest {
@Test
public void testCanBeSubsumed() throws Exception {
// Forced checkpoints cannot be subsumed
CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false);
CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false, false);
PendingCheckpoint pending = createPendingCheckpoint(forced, "ignored");
assertFalse(pending.canBeSubsumed());

Expand All @@ -92,7 +92,7 @@ public void testCanBeSubsumed() throws Exception {
}

// Non-forced checkpoints can be subsumed
CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false);
CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false, false);
pending = createPendingCheckpoint(subsumed, "ignored");
assertTrue(pending.canBeSubsumed());
}
Expand All @@ -106,7 +106,7 @@ public void testPersistExternally() throws Exception {
File tmp = tmpFolder.newFolder();

// Persisted checkpoint
CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false);
CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false, false);

PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath());
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
Expand All @@ -115,7 +115,7 @@ public void testPersistExternally() throws Exception {
assertEquals(1, tmp.listFiles().length);

// Ephemeral checkpoint
CheckpointProperties ephemeral = new CheckpointProperties(false, false, true, true, true, true, true);
CheckpointProperties ephemeral = new CheckpointProperties(false, false, false, true, true, true, true, true);
pending = createPendingCheckpoint(ephemeral, null);
pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());

Expand All @@ -130,7 +130,7 @@ public void testPersistExternally() throws Exception {
*/
@Test
public void testCompletionFuture() throws Exception {
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);

// Abort declined
PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
Expand Down Expand Up @@ -192,7 +192,7 @@ public void testCompletionFuture() throws Exception {
@Test
@SuppressWarnings("unchecked")
public void testAbortDiscardsState() throws Exception {
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false);
QueueExecutor executor = new QueueExecutor();

OperatorState state = mock(OperatorState.class);
Expand Down Expand Up @@ -330,7 +330,7 @@ public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {

@Test
public void testSetCanceller() {
final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
final CheckpointProperties props = new CheckpointProperties(false, false, false, true, true, true, true, true);

PendingCheckpoint aborted = createPendingCheckpoint(props, null);
aborted.abortDeclined();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
public void testSimpleAccess() throws Exception {
long checkpointId = Integer.MAX_VALUE + 1L;
long triggerTimestamp = Integer.MAX_VALUE + 1L;
CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false, true);
CheckpointProperties props = new CheckpointProperties(true, true, false, false, false, true, false, true);
long restoreTimestamp = Integer.MAX_VALUE + 1L;
String externalPath = "external-path";

Expand Down

0 comments on commit 843f0cb

Please sign in to comment.