Skip to content

Commit

Permalink
[hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijiangW committed Jun 4, 2020
1 parent 19bbd6d commit 9add433
Showing 1 changed file with 34 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,26 +130,20 @@ public void releaseBlocksAndResetBarriers() {

/**
* For unaligned checkpoint, it never blocks processing from the task aspect.
*
* <p>For PoC, we do not consider the possibility that the unaligned checkpoint would
* not perform due to the max configured unaligned checkpoint size.
*/
@Override
public boolean isBlocked(int channelIndex) {
return false;
}

/**
* We still need to trigger checkpoint while reading the first barrier from one channel, because this might happen
* earlier than the previous async trigger via mailbox by netty thread. And the {@link AbstractInvokable} has the
* deduplication logic to guarantee trigger checkpoint only once finally.
* We still need to trigger checkpoint while reading the first barrier from one channel, because
* this might happen earlier than the previous async trigger via mailbox by netty thread.
*
* <p>Note this is also suitable for the trigger case of local input channel.
*/
@Override
public void processBarrier(
CheckpointBarrier receivedBarrier,
int channelIndex) throws Exception {
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
long barrierId = receivedBarrier.getId();
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
Expand All @@ -164,33 +158,32 @@ public void processBarrier(
hasInflightBuffers[channelIndex] = false;
numBarrierConsumed++;
}
// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
// buffer queues
// to avoid replicating any logic, we simply call notifyBarrierReceived here as well
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
}

@Override
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
final long barrierId = cancelBarrier.getCheckpointId();
long cancelledId = cancelBarrier.getCheckpointId();
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, cancelledId);
}

if (currentConsumedCheckpointId >= barrierId && !isCheckpointPending()) {
if (currentConsumedCheckpointId >= cancelledId && !isCheckpointPending()) {
return;
}

if (isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
barrierId,
currentConsumedCheckpointId);
} else if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
"Skipping current checkpoint.",
taskName,
cancelledId,
currentConsumedCheckpointId);
}

releaseBlocksAndResetBarriers();
currentConsumedCheckpointId = barrierId;
currentConsumedCheckpointId = cancelledId;
threadSafeUnaligner.setCurrentReceivedCheckpointId(currentConsumedCheckpointId);
notifyAbortOnCancellationBarrier(barrierId);
notifyAbortOnCancellationBarrier(cancelledId);
}

@Override
Expand Down Expand Up @@ -258,6 +251,11 @@ private int getFlattenedChannelIndex(InputChannelInfo channelInfo) {
return gateChannelOffsets[channelInfo.getGateIdx()] + channelInfo.getInputChannelIdx();
}

@VisibleForTesting
int getNumOpenChannels() {
return threadSafeUnaligner.getNumOpenChannels();
}

@ThreadSafe
private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {

Expand All @@ -267,9 +265,7 @@ private static class ThreadSafeUnaligner implements BufferReceivedListener, Clos
*/
private final boolean[] storeNewBuffers;

/**
* The number of input channels which has read the barrier by task.
*/
/** The number of input channels which has received or processed the barrier. */
private int numBarriersReceived;

/** A future indicating that all barriers of the a given checkpoint have been read. */
Expand All @@ -284,21 +280,18 @@ private static class ThreadSafeUnaligner implements BufferReceivedListener, Clos
*/
private long currentReceivedCheckpointId = -1L;

/** The number of opened channels. */
/** The number of open channels. */
private int numOpenChannels;

private final ChannelStateWriter channelStateWriter;

private final CheckpointBarrierUnaligner handler;

public ThreadSafeUnaligner(
int totalNumChannels,
ChannelStateWriter channelStateWriter,
CheckpointBarrierUnaligner handler) {
storeNewBuffers = new boolean[totalNumChannels];
ThreadSafeUnaligner(int totalNumChannels, ChannelStateWriter channelStateWriter, CheckpointBarrierUnaligner handler) {
this.numOpenChannels = totalNumChannels;
this.storeNewBuffers = new boolean[totalNumChannels];
this.channelStateWriter = channelStateWriter;
this.handler = handler;
numOpenChannels = totalNumChannels;
}

@Override
Expand Down Expand Up @@ -359,11 +352,10 @@ private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws
// let the task know we are not completing this
long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() ->
handler.notifyAbort(currentCheckpointId,
new CheckpointException(
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
"notifyAbort");
handler.notifyAbort(
currentCheckpointId,
new CheckpointException("Barrier id: " + barrierId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
"notifyAbort");
}

currentReceivedCheckpointId = barrierId;
Expand All @@ -373,7 +365,7 @@ private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws
channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
}

public synchronized void resetReceivedBarriers(long checkpointId) {
synchronized void resetReceivedBarriers(long checkpointId) {
if (checkpointId >= currentReceivedCheckpointId && numBarriersReceived > 0) {
// avoid more data being serialized after abortion
Arrays.fill(storeNewBuffers, false);
Expand All @@ -382,7 +374,7 @@ public synchronized void resetReceivedBarriers(long checkpointId) {
}
}

public synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
if (checkpointId < currentReceivedCheckpointId) {
return FutureUtils.completedVoidFuture();
}
Expand All @@ -392,22 +384,17 @@ public synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long ch
return allBarriersReceivedFuture;
}

public synchronized void onChannelClosed() {
synchronized void onChannelClosed() {
numOpenChannels--;
}

public synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
this.currentReceivedCheckpointId = Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
}

@VisibleForTesting
public synchronized int getNumOpenChannels() {
synchronized int getNumOpenChannels() {
return numOpenChannels;
}
}

@VisibleForTesting
public int getNumOpenChannels() {
return threadSafeUnaligner.getNumOpenChannels();
}
}

0 comments on commit 9add433

Please sign in to comment.