Skip to content

Commit

Permalink
[FLINK-5158] [ckPtCoord] Handle exceptions from CompletedCheckpointSt…
Browse files Browse the repository at this point in the history
…ore in CheckpointCoordinator

Handle exceptions from the CompletedCheckpointStore properly in the CheckpointCoordinator. This
means that in case of an exception, the completed checkpoint will be properly cleaned up and also
the triggering of subsequent checkpoints will be started.

This closes apache#2872.
  • Loading branch information
tillrohrmann committed Dec 9, 2016
1 parent add3765 commit 0c42d25
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,74 +609,36 @@ else if (LOG.isDebugEnabled()) {
*
* @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
return false;
}

if (!job.equals(message.getJob())) {
LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", message);
LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
return false;
}

final long checkpointId = message.getCheckpointId();

CompletedCheckpoint completed = null;
PendingCheckpoint checkpoint;

// Flag indicating whether the ack message was for a known pending
// checkpoint.
boolean isPendingCheckpoint;

synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}

checkpoint = pendingCheckpoints.get(checkpointId);
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;

switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());

if (checkpoint.isFullyAcknowledged()) {

// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();

// complete the checkpoint structure
completed = checkpoint.finalizeCheckpoint();
completedCheckpointStore.addCheckpoint(completed);

LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
completed.getStateSize(), completed.getDuration());

if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (TaskState state : completed.getTaskStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.delete(builder.length() - 2, builder.length());

LOG.debug(builder.toString());
}

pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);

dropSubsumedCheckpoints(completed.getCheckpointID());

triggerQueuedRequests();
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
Expand All @@ -700,46 +662,115 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E

discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}

return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
}
else {
boolean wasPendingCheckpoint;

// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
isPendingCheckpoint = true;
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from " +
"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
}
else {
LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
isPendingCheckpoint = false;
wasPendingCheckpoint = false;
}

// try to discard the state so that we don't have lingering state lying around
discardState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

return wasPendingCheckpoint;
}
}
}

// send the confirmation messages to the necessary targets. we do this here
// to be outside the lock scope
if (completed != null) {
final long timestamp = completed.getTimestamp();
/**
* Try to complete the given pending checkpoint.
*
* Important: This method should only be called in the checkpoint lock scope.
*
* @param pendingCheckpoint to complete
* @throws CheckpointException if the completion failed
*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
CompletedCheckpoint completedCheckpoint = null;

for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
try {
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();

completedCheckpointStore.addCheckpoint(completedCheckpoint);

rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(checkpointId);
} catch (Exception exception) {
// abort the current pending checkpoint if it has not been discarded yet
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abortError(exception);
}

statsTracker.onCompletedCheckpoint(completed);
if (completedCheckpoint != null) {
// we failed to store the completed checkpoint. Let's clean up
final CompletedCheckpoint cc = completedCheckpoint;

executor.execute(new Runnable() {
@Override
public void run() {
try {
cc.discard();
} catch (Exception nestedException) {
LOG.warn("Could not properly discard completed checkpoint {}.", cc.getCheckpointID(), nestedException);
}
}
});
}

throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
} finally {
pendingCheckpoints.remove(checkpointId);

triggerQueuedRequests();
}

// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();

LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

return isPendingCheckpoint;
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (TaskState state : completedCheckpoint.getTaskStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.delete(builder.length() - 2, builder.length());

LOG.debug(builder.toString());
}

final long timestamp = completedCheckpoint.getTimestamp();

for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}

statsTracker.onCompletedCheckpoint(completedCheckpoint);
}

private void rememberRecentCheckpointId(long id) {
Expand Down Expand Up @@ -958,22 +989,34 @@ public void run() {
}
}

/**
* Discards the given state object asynchronously belonging to the given job, execution attempt
* id and checkpoint id.
*
* @param jobId identifying the job to which the state object belongs
* @param executionAttemptID identifying the task to which the state object belongs
* @param checkpointId of the state object
* @param stateObject to discard asynchronously
*/
private void discardState(
final JobID jobId,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final StateObject stateObject) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
stateObject.discardState();
} catch (Exception e) {
final JobID jobId,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final StateObject stateObject) {

if (stateObject != null) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
stateObject.discardState();
} catch (Throwable throwable) {
LOG.warn("Could not properly discard state object of checkpoint {} " +
"belonging to task {} of job {}.", checkpointId, executionAttemptID, jobId,
e);
throwable);
}
}
}
});
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

/**
* Base class for checkpoint related exceptions.
*/
public class CheckpointException extends Exception {

private static final long serialVersionUID = -4341865597039002540L;

public CheckpointException(String message, Throwable cause) {
super(message, cause);
}

public CheckpointException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public boolean discard(JobStatus jobStatus) throws Exception {
}
}

private void discard() throws Exception {
void discard() throws Exception {
try {
if (externalPath != null) {
SavepointStore.removeSavepoint(externalPath);
Expand Down
Loading

0 comments on commit 0c42d25

Please sign in to comment.