diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 5f0fd74c43174..3ce7a5a59cdc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -609,25 +609,18 @@ else if (LOG.isDebugEnabled()) { * * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. */ - public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception { + public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException { if (shutdown || message == null) { return false; } if (!job.equals(message.getJob())) { - LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message); + LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message); return false; } final long checkpointId = message.getCheckpointId(); - CompletedCheckpoint completed = null; - PendingCheckpoint checkpoint; - - // Flag indicating whether the ack message was for a known pending - // checkpoint. - boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages @@ -635,10 +628,9 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E return false; } - checkpoint = pendingCheckpoints.get(checkpointId); + final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - isPendingCheckpoint = true; switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) { case SUCCESS: @@ -646,37 +638,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E checkpointId, message.getTaskExecutionId(), message.getJob()); if (checkpoint.isFullyAcknowledged()) { - - // record the time when this was completed, to calculate - // the 'min delay between checkpoints' - lastCheckpointCompletionNanos = System.nanoTime(); - - // complete the checkpoint structure - completed = checkpoint.finalizeCheckpoint(); - completedCheckpointStore.addCheckpoint(completed); - - LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId, - completed.getStateSize(), completed.getDuration()); - - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - builder.append("Checkpoint state: "); - for (TaskState state : completed.getTaskStates().values()) { - builder.append(state); - builder.append(", "); - } - // Remove last two chars ", " - builder.delete(builder.length() - 2, builder.length()); - - LOG.debug(builder.toString()); - } - - pendingCheckpoints.remove(checkpointId); - rememberRecentCheckpointId(checkpointId); - - dropSubsumedCheckpoints(completed.getCheckpointID()); - - triggerQueuedRequests(); + completePendingCheckpoint(checkpoint); } break; case DUPLICATE: @@ -700,6 +662,8 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); } + + return true; } else if (checkpoint != null) { // this should not happen @@ -707,39 +671,106 @@ else if (checkpoint != null) { "Received message for discarded but non-removed checkpoint " + checkpointId); } else { + boolean wasPendingCheckpoint; + // message is for an unknown checkpoint, or comes too late (checkpoint disposed) if (recentPendingCheckpoints.contains(checkpointId)) { - isPendingCheckpoint = true; + wasPendingCheckpoint = true; LOG.warn("Received late message for now expired checkpoint attempt {} from " + "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); } else { LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); - isPendingCheckpoint = false; + wasPendingCheckpoint = false; } // try to discard the state so that we don't have lingering state lying around discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState()); + + return wasPendingCheckpoint; } } + } - // send the confirmation messages to the necessary targets. we do this here - // to be outside the lock scope - if (completed != null) { - final long timestamp = completed.getTimestamp(); + /** + * Try to complete the given pending checkpoint. + * + * Important: This method should only be called in the checkpoint lock scope. + * + * @param pendingCheckpoint to complete + * @throws CheckpointException if the completion failed + */ + private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException { + final long checkpointId = pendingCheckpoint.getCheckpointId(); + CompletedCheckpoint completedCheckpoint = null; - for (ExecutionVertex ev : tasksToCommitTo) { - Execution ee = ev.getCurrentExecutionAttempt(); - if (ee != null) { - ee.notifyCheckpointComplete(checkpointId, timestamp); - } + try { + completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); + + completedCheckpointStore.addCheckpoint(completedCheckpoint); + + rememberRecentCheckpointId(checkpointId); + dropSubsumedCheckpoints(checkpointId); + } catch (Exception exception) { + // abort the current pending checkpoint if it has not been discarded yet + if (!pendingCheckpoint.isDiscarded()) { + pendingCheckpoint.abortError(exception); } - statsTracker.onCompletedCheckpoint(completed); + if (completedCheckpoint != null) { + // we failed to store the completed checkpoint. Let's clean up + final CompletedCheckpoint cc = completedCheckpoint; + + executor.execute(new Runnable() { + @Override + public void run() { + try { + cc.discard(); + } catch (Exception nestedException) { + LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException); + } + } + }); + } + + throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception); + } finally { + pendingCheckpoints.remove(checkpointId); + + triggerQueuedRequests(); } + + // record the time when this was completed, to calculate + // the 'min delay between checkpoints' + lastCheckpointCompletionNanos = System.nanoTime(); + + LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId, + completedCheckpoint.getStateSize(), completedCheckpoint.getDuration()); - return isPendingCheckpoint; + if (LOG.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + builder.append("Checkpoint state: "); + for (TaskState state : completedCheckpoint.getTaskStates().values()) { + builder.append(state); + builder.append(", "); + } + // Remove last two chars ", " + builder.delete(builder.length() - 2, builder.length()); + + LOG.debug(builder.toString()); + } + + final long timestamp = completedCheckpoint.getTimestamp(); + + for (ExecutionVertex ev : tasksToCommitTo) { + Execution ee = ev.getCurrentExecutionAttempt(); + if (ee != null) { + ee.notifyCheckpointComplete(checkpointId, timestamp); + } + } + + statsTracker.onCompletedCheckpoint(completedCheckpoint); } private void rememberRecentCheckpointId(long id) { @@ -958,22 +989,34 @@ public void run() { } } + /** + * Discards the given state object asynchronously belonging to the given job, execution attempt + * id and checkpoint id. + * + * @param jobId identifying the job to which the state object belongs + * @param executionAttemptID identifying the task to which the state object belongs + * @param checkpointId of the state object + * @param stateObject to discard asynchronously + */ private void discardState( - final JobID jobId, - final ExecutionAttemptID executionAttemptID, - final long checkpointId, - final StateObject stateObject) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - stateObject.discardState(); - } catch (Exception e) { + final JobID jobId, + final ExecutionAttemptID executionAttemptID, + final long checkpointId, + final StateObject stateObject) { + + if (stateObject != null) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + stateObject.discardState(); + } catch (Throwable throwable) { LOG.warn("Could not properly discard state object of checkpoint {} " + "belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId, - e); + throwable); + } } - } - }); + }); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java new file mode 100644 index 0000000000000..707878cc055b8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +/** + * Base class for checkpoint related exceptions. + */ +public class CheckpointException extends Exception { + + private static final long serialVersionUID = -4341865597039002540L; + + public CheckpointException(String message, Throwable cause) { + super(message, cause); + } + + public CheckpointException(String message) { + super(message); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index ed650112f8f0e..0e70b1ae1c8dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -151,7 +151,7 @@ public boolean discard(JobStatus jobStatus) throws Exception { } } - private void discard() throws Exception { + void discard() throws Exception { try { if (externalPath != null) { SavepointStore.removeSavepoint(externalPath); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index cfb59f6dd6ca1..e7df5bc7982ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -198,47 +199,37 @@ public Future getCompletionFuture() { return onCompletionPromise; } - public CompletedCheckpoint finalizeCheckpoint() throws Exception { + public CompletedCheckpoint finalizeCheckpoint() { synchronized (lock) { - try { - if (discarded) { - throw new IllegalStateException("pending checkpoint is discarded"); + Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); + + // Persist if required + String externalPath = null; + if (props.externalizeCheckpoint()) { + try { + Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); + externalPath = SavepointStore.storeSavepoint( + targetDirectory, + savepoint); + } catch (IOException e) { + LOG.error("Failed to persist checkpoint {}.",checkpointId, e); } - if (notYetAcknowledgedTasks.isEmpty()) { - // Persist if required - String externalPath = null; - if (props.externalizeCheckpoint()) { - try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint); - } catch (Throwable t) { - LOG.error("Failed to persist checkpoints " + checkpointId + ".", t); - } - } + } - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - props, - externalPath); + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalPath); - onCompletionPromise.complete(completed); + onCompletionPromise.complete(completed); - dispose(false); + dispose(false); - return completed; - } else { - throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); - } - } catch (Throwable t) { - onCompletionPromise.completeExceptionally(t); - throw t; - } + return completed; } } @@ -378,9 +369,8 @@ public void abortError(Throwable cause) { private void dispose(boolean releaseState) { synchronized (lock) { try { - discarded = true; numAcknowledgedTasks = -1; - if (releaseState) { + if (!discarded && releaseState) { executor.execute(new Runnable() { @Override public void run() { @@ -395,6 +385,7 @@ public void run() { } } finally { + discarded = true; taskStates.clear(); notYetAcknowledgedTasks.clear(); acknowledgedTasks.clear(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java new file mode 100644 index 0000000000000..26db0129fbe57 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(PendingCheckpoint.class) +public class CheckpointCoordinatorFailureTest extends TestLogger { + + /** + * Tests that a failure while storing a completed checkpoint in the completed checkpoint store + * will properly fail the originating pending checkpoint and clean upt the completed checkpoint. + */ + @Test + public void testFailingCompletedCheckpointStoreAdd() throws Exception { + JobID jid = new JobID(); + + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + final ExecutionVertex vertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptId); + + final long triggerTimestamp = 1L; + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[]{vertex}, + new ExecutionVertex[]{vertex}, + new ExecutionVertex[]{vertex}, + new StandaloneCheckpointIDCounter(), + new FailingCompletedCheckpointStore(), + null, + new DisabledCheckpointStatsTracker(), + Executors.directExecutor()); + + coord.triggerCheckpoint(triggerTimestamp, false); + + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + + PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next(); + + assertFalse(pendingCheckpoint.isDiscarded()); + + final long checkpointId =coord.getPendingCheckpoints().keySet().iterator().next(); + + final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, triggerTimestamp); + AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointMetaData); + + CompletedCheckpoint completedCheckpoint = mock(CompletedCheckpoint.class); + PowerMockito.whenNew(CompletedCheckpoint.class).withAnyArguments().thenReturn(completedCheckpoint); + + try { + coord.receiveAcknowledgeMessage(acknowledgeMessage); + fail("Expected a checkpoint exception because the completed checkpoint store could not " + + "store the completed checkpoint."); + } catch (CheckpointException e) { + // ignore because we expected this exception + } + + // make sure that the pending checkpoint has been discarded after we could not complete it + assertTrue(pendingCheckpoint.isDiscarded()); + + verify(completedCheckpoint).discard(); + } + + private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore { + + @Override + public void recover() throws Exception { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { + throw new Exception("The failing completed checkpoint store failed again... :-("); + } + + @Override + public CompletedCheckpoint getLatestCheckpoint() throws Exception { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public void shutdown(JobStatus jobStatus) throws Exception { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public List getAllCheckpoints() throws Exception { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return -1; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 73eebcfc4f902..463c2ae9121b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2589,7 +2589,7 @@ private static ExecutionJobVertex mockExecutionJobVertex( return executionJobVertex; } - private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) { + static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) { return mockExecutionVertex( attemptID, new JobVertexID(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 6b0d3f8852a1e..9f4bdbac53973 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -260,7 +260,7 @@ public boolean discard(JobStatus jobStatus) throws Exception { } } - private void discard() { + void discard() { if (!isDiscarded) { this.isDiscarded = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index e9189654638fa..be7b0330afc02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -158,7 +158,6 @@ public void testCompletionFuture() throws Exception { } catch (IllegalStateException ignored) { // Expected } - assertTrue(future.isDone()); } /**