Skip to content

Commit

Permalink
[FLINK-18137] Handle discarding of triggering checkpoint correctly
Browse files Browse the repository at this point in the history
Before discarding a triggering checkpoint could cause a NPE which would stop the
processing of subsequent checkpoint requests. This commit changes this behaviour
by checking this condition and instantiating a proper exception in case that a
triggering checkpoint is being discarded.

This closes apache#12611.
  • Loading branch information
tillrohrmann committed Jun 12, 2020
1 parent b592dd2 commit 38b2755
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,27 +544,39 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
executions,
request.advanceToEndOfTime);

coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));

onTriggerSuccess();
Preconditions.checkState(
checkpoint != null || throwable != null,
"Either the pending checkpoint needs to be created or an error must have been occurred.");

if (throwable != null) {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(request, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
} else {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(request, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
if (checkpoint.isDiscarded()) {
onTriggerFailure(
checkpoint,
new CheckpointException(
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
checkpoint.getFailureCause()));
} else {
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
executions,
request.advanceToEndOfTime);

coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));

onTriggerSuccess();
}
}
},
timer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
Expand All @@ -31,6 +33,7 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
Expand All @@ -45,14 +48,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -529,6 +537,48 @@ public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
}

/**
* This test only fails eventually.
*/
@Test
public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
final ExecutionVertex executionVertex = mockExecutionVertex(new ExecutionAttemptID());

final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
.setTasks(new ExecutionVertex[]{executionVertex})
.setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService))
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
.build())
.build();

final CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<>();
final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
checkpointCoordinator.addMasterHook(new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));

try {
checkpointCoordinator.triggerCheckpoint(false);
final CompletableFuture<CompletedCheckpoint> secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);

triggerCheckpointLatch.await();
masterHookCheckpointFuture.complete("Completed");

// discard triggering checkpoint
checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));

try {
// verify that the second checkpoint request will be executed and eventually times out
secondCheckpoint.get();
fail("Expected the second checkpoint to fail.");
} catch (ExecutionException ee) {
assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(CheckpointException.class));
}
} finally {
checkpointCoordinator.shutdown(JobStatus.FINISHED);
ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService);
}
}

private CheckpointCoordinator createCheckpointCoordinator() {
return new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
Expand Down Expand Up @@ -568,9 +618,15 @@ private static class TestingMasterHook implements MasterTriggerRestoreHook<Strin
new CheckpointCoordinatorTestingUtils.StringSerializer();

private final CompletableFuture<String> checkpointFuture;
private final OneShotLatch triggerCheckpointLatch;

private TestingMasterHook(CompletableFuture<String> checkpointFuture) {
this(checkpointFuture, new OneShotLatch());
}

private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) {
this.checkpointFuture = checkpointFuture;
this.triggerCheckpointLatch = triggerCheckpointLatch;
}

@Override
Expand All @@ -582,6 +638,7 @@ public String getIdentifier() {
@Override
public CompletableFuture<String> triggerCheckpoint(
long checkpointId, long timestamp, Executor executor) {
triggerCheckpointLatch.trigger();
return checkpointFuture;
}

Expand Down

0 comments on commit 38b2755

Please sign in to comment.