Skip to content

Commit

Permalink
[FLINK-25191] Skip savepoints for recovery
Browse files Browse the repository at this point in the history
This closes apache#18092
  • Loading branch information
dawidwys committed Dec 16, 2021
1 parent f696593 commit b19cc31
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1200,48 +1200,64 @@ public boolean receiveAcknowledgeMessage(
*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final long checkpointId = pendingCheckpoint.getCheckpointID();
final CompletedCheckpoint completedCheckpoint;
final CompletedCheckpoint lastSubsumed;
final CheckpointProperties props = pendingCheckpoint.getProps();

// As a first step to complete the checkpoint, we register its state with the registry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
SharedStateRegistry sharedStateRegistry = completedCheckpointStore.getSharedStateRegistry();
sharedStateRegistry.registerAll(operatorStates.values());
// we do not register savepoints' shared state, as Flink is not in charge of savepoints'
// lifecycle
if (!props.isSavepoint()) {
registerSharedStates(pendingCheckpoint);
}

try {
completedCheckpoint = finalizeCheckpoint(pendingCheckpoint);

// the pending checkpoint must be discarded after the finalization
Preconditions.checkState(pendingCheckpoint.isDisposed() && completedCheckpoint != null);

lastSubsumed =
addCompletedCheckpointToStoreAndSubsumeOldest(
checkpointId,
completedCheckpoint,
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo());
if (!props.isSavepoint()) {
lastSubsumed =
addCompletedCheckpointToStoreAndSubsumeOldest(
checkpointId,
completedCheckpoint,
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo());
} else {
lastSubsumed = null;
}
} finally {
pendingCheckpoints.remove(checkpointId);
scheduleTriggerRequest();
}

// remember recent checkpoint id for debugging purposes
rememberRecentCheckpointId(checkpointId);

// drop those pending checkpoints that are at prior to the completed one
dropSubsumedCheckpoints(checkpointId);

// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();

logCheckpointInfo(completedCheckpoint);

// send the "notify complete" call to all vertices, coordinators, etc.
sendAcknowledgeMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
checkpointId,
completedCheckpoint.getTimestamp(),
extractIdIfDiscardedOnSubsumed(lastSubsumed));
if (!props.isSavepoint() || props.isSynchronous()) {
// drop those pending checkpoints that are at prior to the completed one
dropSubsumedCheckpoints(checkpointId);

// send the "notify complete" call to all vertices, coordinators, etc.
sendAcknowledgeMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
checkpointId,
completedCheckpoint.getTimestamp(),
extractIdIfDiscardedOnSubsumed(lastSubsumed));
}
}

private void registerSharedStates(PendingCheckpoint pendingCheckpoint) {
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
SharedStateRegistry sharedStateRegistry = completedCheckpointStore.getSharedStateRegistry();
sharedStateRegistry.registerAll(operatorStates.values());
}

private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.Collections;
import java.util.List;
Expand All @@ -65,8 +63,6 @@
/** Tests for failure of checkpoint coordinator. */
public class CheckpointCoordinatorFailureTest extends TestLogger {

@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();

/**
* Tests that a failure while storing a completed checkpoint in the completed checkpoint store
* will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
Expand Down Expand Up @@ -231,7 +227,7 @@ public void cleanCheckpointOnFailedStoring(
.setCompletedCheckpointStore(completedCheckpointStore)
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import static org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -2018,67 +2019,22 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
assertNotNull(savepointFuture.get());

// the now we should have a completed checkpoint
assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
// savepoints should not registered as retained checkpoints
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());

// validate that the relevant tasks got a confirmation message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
}

// validate that the shared states are registered
{
verify(subtaskState1, times(1)).registerSharedStates(any(SharedStateRegistry.class));
verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class));
}

CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
CompletedCheckpoint success = savepointFuture.get();
assertEquals(graph.getJobID(), success.getJobId());
assertEquals(pending.getCheckpointId(), success.getCheckpointID());
assertEquals(2, success.getOperatorStates().size());

// ---------------
// trigger another checkpoint and see that this one replaces the other checkpoint
// ---------------
gateway.resetCount();
savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
assertFalse(savepointFuture.isDone());

long checkpointIdNew =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointIdNew),
TASK_MANAGER_LOCATION_INFO);
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew),
TASK_MANAGER_LOCATION_INFO);

assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());

CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
assertEquals(graph.getJobID(), successNew.getJobId());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
assertEquals(2, successNew.getOperatorStates().size());
assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
assertNotNull(savepointFuture.get());

// validate that the first savepoint does not discard its private states.
verify(subtaskState1, never()).discardState();
verify(subtaskState2, never()).discardState();

// validate that the relevant tasks got a confirmation message
for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
assertEquals(
checkpointIdNew, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
assertEquals(
checkpointIdNew,
gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
}

checkpointCoordinator.shutdown();
}

Expand Down Expand Up @@ -2176,21 +2132,20 @@ public void testSavepointsAreNotSubsumed() throws Exception {
FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());

// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
// savepoints should not subsume checkpoints
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId2),
TASK_MANAGER_LOCATION_INFO);
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId2),
TASK_MANAGER_LOCATION_INFO);

// currently, we do not subsume a checkpoint after a savepoint completed to avoid data lost.
verify(checkpointCoordinator, times(1))
.sendAcknowledgeMessages(
anyList(), eq(savepointId2), anyLong(), eq(INVALID_CHECKPOINT_ID));
// we do not send notify checkpoint complete for savepoints
verify(checkpointCoordinator, times(0))
.sendAcknowledgeMessages(anyList(), eq(savepointId2), anyLong(), anyLong());

assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());

assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());

Expand All @@ -2205,13 +2160,12 @@ public void testSavepointsAreNotSubsumed() throws Exception {
new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId1),
TASK_MANAGER_LOCATION_INFO);

// savepoint should not be subsumed.
verify(checkpointCoordinator, times(1))
.sendAcknowledgeMessages(
anyList(), eq(savepointId1), anyLong(), eq(INVALID_CHECKPOINT_ID));
// we do not send notify checkpoint complete for savepoints
verify(checkpointCoordinator, times(0))
.sendAcknowledgeMessages(anyList(), eq(savepointId1), anyLong(), anyLong());

assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
assertNotNull(savepointFuture1.get());

CompletableFuture<CompletedCheckpoint> checkpointFuture4 =
Expand Down
Loading

0 comments on commit b19cc31

Please sign in to comment.