Skip to content

Commit

Permalink
[FLINK-18063][checkpointing] Fix the race condition of aborting check…
Browse files Browse the repository at this point in the history
…point in CheckpointBarrierUnaligner

There are three aborting scenarios which might encounter race condition:

1. CheckpointBarrierUnaligner#processCancellationBarrier
2. CheckpointBarrierUnaligner#processEndOfPartition
3. AlternatingCheckpointBarrierHandler#processBarrier

They only consider the pending checkpoint triggered by #processBarrier from task thread to abort it. Actually the checkpoint might
also be triggered by #notifyBarrierReceived from netty thread in race condition, so we should also handle properly to abort it.

This closes apache#12460.
  • Loading branch information
zhijiangW committed Jun 9, 2020
1 parent d99e1f8 commit f672ae5
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,17 @@ public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex)
if (receivedBarrier.getId() < lastSeenBarrierId) {
return;
}

lastSeenBarrierId = receivedBarrier.getId();
CheckpointBarrierHandler previousHandler = activeHandler;
activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
abortPreviousIfNeeded(receivedBarrier, previousHandler);
activeHandler.processBarrier(receivedBarrier, channelIndex);
}

private void abortPreviousIfNeeded(CheckpointBarrier barrier, CheckpointBarrierHandler prevHandler) throws IOException {
if (prevHandler != activeHandler && prevHandler.isCheckpointPending() && prevHandler.getLatestCheckpointId() < barrier.getId()) {
prevHandler.releaseBlocksAndResetBarriers();
notifyAbort(
prevHandler.getLatestCheckpointId(),
new CheckpointException(
format("checkpoint %d subsumed by %d", prevHandler.getLatestCheckpointId(), barrier.getId()),
CHECKPOINT_DECLINED_SUBSUMED));
if (previousHandler != activeHandler) {
previousHandler.abortPendingCheckpoint(
lastSeenBarrierId,
new CheckpointException(format("checkpoint subsumed by %d", lastSeenBarrierId), CHECKPOINT_DECLINED_SUBSUMED));
}

activeHandler.processBarrier(receivedBarrier, channelIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
this.blockedChannels = new boolean[totalNumberOfInputChannels];
}

@Override
public void abortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException {
if (checkpointId > currentCheckpointId && isCheckpointPending()) {
releaseBlocksAndResetBarriers();
notifyAbort(currentCheckpointId, exception);
}
}

@Override
public void releaseBlocksAndResetBarriers() {
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public CheckpointBarrierHandler(AbstractInvokable toNotifyOnCheckpoint) {
this.toNotifyOnCheckpoint = checkNotNull(toNotifyOnCheckpoint);
}

public abstract void releaseBlocksAndResetBarriers();
public void releaseBlocksAndResetBarriers() {
}

/**
* Checks whether the channel with the given index is blocked.
Expand Down Expand Up @@ -138,4 +139,7 @@ protected <E extends Exception> void executeInTaskThread(
}

protected abstract boolean isCheckpointPending();

protected void abortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ public CheckpointBarrierTracker(int totalNumberOfInputChannels, AbstractInvokabl
this.pendingCheckpoints = new ArrayDeque<>();
}

@Override
public void releaseBlocksAndResetBarriers() {
}

@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,6 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(checkpointCoordinator), this);
}

@Override
public void releaseBlocksAndResetBarriers() {
if (isCheckpointPending()) {
// make sure no additional data is persisted
Arrays.fill(hasInflightBuffers, false);
// the next barrier that comes must assume it is the first
numBarrierConsumed = 0;
}
threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId);
}

/**
* We still need to trigger checkpoint via {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
* while reading the first barrier from one channel, because this might happen
Expand Down Expand Up @@ -156,41 +145,46 @@ public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex)
}

@Override
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
long cancelledId = cancelBarrier.getCheckpointId();
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, cancelledId);
}
public void abortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException {
threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, exception);

if (currentConsumedCheckpointId >= cancelledId && !isCheckpointPending()) {
return;
if (checkpointId > currentConsumedCheckpointId) {
resetPendingCheckpoint(checkpointId);
}
}

if (isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
@Override
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
final long cancelledId = cancelBarrier.getCheckpointId();
boolean shouldAbort = threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
if (shouldAbort) {
notifyAbort(
cancelledId,
currentConsumedCheckpointId);
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
}

releaseBlocksAndResetBarriers();
currentConsumedCheckpointId = cancelledId;
threadSafeUnaligner.setCurrentReceivedCheckpointId(currentConsumedCheckpointId);
notifyAbortOnCancellationBarrier(cancelledId);
if (cancelledId >= currentConsumedCheckpointId) {
resetPendingCheckpoint(cancelledId);
currentConsumedCheckpointId = cancelledId;
}
}

@Override
public void processEndOfPartition() throws Exception {
threadSafeUnaligner.onChannelClosed();
resetPendingCheckpoint(-1L);
}

private void resetPendingCheckpoint(long checkpointId) {
if (isCheckpointPending()) {
// let the task know we skip a checkpoint
notifyAbort(
currentConsumedCheckpointId,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
// no chance to complete this checkpoint
releaseBlocksAndResetBarriers();
LOG.warn("{}: Received barrier or EndOfPartition(-1) {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
checkpointId,
currentConsumedCheckpointId);

Arrays.fill(hasInflightBuffers, false);
numBarrierConsumed = 0;
}
}

Expand Down Expand Up @@ -361,15 +355,6 @@ private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws
checkpointCoordinator.initCheckpoint(barrierId, barrier.getCheckpointOptions());
}

synchronized void resetReceivedBarriers(long checkpointId) {
if (checkpointId >= currentReceivedCheckpointId && numBarriersReceived > 0) {
// avoid more data being serialized after abortion
Arrays.fill(storeNewBuffers, false);
// the next barrier that comes must assume it is the first
numBarriersReceived = 0;
}
}

synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
if (checkpointId < currentReceivedCheckpointId) {
return FutureUtils.completedVoidFuture();
Expand All @@ -380,12 +365,40 @@ synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpoin
return allBarriersReceivedFuture;
}

synchronized void onChannelClosed() {
synchronized void onChannelClosed() throws IOException {
numOpenChannels--;

if (resetPendingCheckpoint()) {
handler.notifyAbort(
currentReceivedCheckpointId,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
}
}

synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
this.currentReceivedCheckpointId = Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
synchronized boolean setCancelledCheckpointId(long cancelledId) {
if (currentReceivedCheckpointId > cancelledId || (currentReceivedCheckpointId == cancelledId && numBarriersReceived == 0)) {
return false;
}

resetPendingCheckpoint();
currentReceivedCheckpointId = cancelledId;
return true;
}

synchronized void tryAbortPendingCheckpoint(long checkpointId, CheckpointException exception) throws IOException {
if (checkpointId > currentReceivedCheckpointId && resetPendingCheckpoint()) {
handler.notifyAbort(currentReceivedCheckpointId, exception);
}
}

private boolean resetPendingCheckpoint() {
if (numBarriersReceived == 0) {
return false;
}

Arrays.fill(storeNewBuffers, false);
numBarriersReceived = 0;
return true;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static Object[][] parameters() {
new Object[]{true, false, Arrays.asList(checkpoint(20), checkpoint(10)), 1, 0},
new Object[]{true, true, Arrays.asList(checkpoint(10), cancel(10)), 1, 0},
new Object[]{true, true, Arrays.asList(checkpoint(10), cancel(20)), 1, 0},
new Object[]{true, true, Arrays.asList(checkpoint(20), cancel(10)), 1, 0},
new Object[]{true, false, Arrays.asList(checkpoint(20), cancel(10)), 1, 0},
};
}

Expand Down
Loading

0 comments on commit f672ae5

Please sign in to comment.