Skip to content

Commit

Permalink
[FLINK-32514][Runtime/Checkpointing] Support configuring checkpointin…
Browse files Browse the repository at this point in the history
…g interval during process backlog (apache#22931)

This closes apache#22931.
  • Loading branch information
yunfengzhou-hub authored Aug 24, 2023
1 parent d5b114f commit 615e824
Show file tree
Hide file tree
Showing 31 changed files with 953 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
<td><h5>execution.checkpointing.interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Gets the interval in which checkpoints are periodically scheduled.<br /><br />This setting defines the base interval. Checkpoint triggering may be delayed by the settings <code class="highlighter-rouge">execution.checkpointing.max-concurrent-checkpoints</code> and <code class="highlighter-rouge">execution.checkpointing.min-pause</code></td>
<td>Gets the interval in which checkpoints are periodically scheduled.<br /><br />This setting defines the base interval. Checkpoint triggering may be delayed by the settings <code class="highlighter-rouge">execution.checkpointing.max-concurrent-checkpoints</code>, <code class="highlighter-rouge">execution.checkpointing.min-pause</code> and <code class="highlighter-rouge">execution.checkpointing.interval-during-backlog</code></td>
</tr>
<tr>
<td><h5>execution.checkpointing.interval-during-backlog</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>If it is not null and any source reports isProcessingBacklog=true, it is the interval in which checkpoints are periodically scheduled.<br /><br />Checkpoint triggering may be delayed by the settings <code class="highlighter-rouge">execution.checkpointing.max-concurrent-checkpoints</code> and <code class="highlighter-rouge">execution.checkpointing.min-pause</code>.<br /><br />Note: if it is not null, the value must either be 0, which means the checkpoint is disabled during backlog, or be larger than or equal to execution.checkpointing.interval.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.max-concurrent-checkpoints</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public Object getPreviousEnumerator() {
"Failed to create enumerator for sourceIndex=" + currentSourceIndex, e);
}
LOG.info("Starting enumerator for sourceIndex={}", currentSourceIndex);
context.setIsProcessingBacklog(currentSourceIndex < sources.size() - 1);
currentEnumerator.start();
}

Expand Down Expand Up @@ -422,6 +423,11 @@ public <T> void callAsync(
public void runInCoordinatorThread(Runnable runnable) {
realContext.runInCoordinatorThread(runnable);
}

@Override
public void setIsProcessingBacklog(boolean isProcessingBacklog) {
realContext.setIsProcessingBacklog(isProcessingBacklog);
}
}

private static void checkAndSignalNoMoreSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;

import java.util.Map;
Expand Down Expand Up @@ -181,4 +182,18 @@ <T> void callAsync(
* @param runnable a runnable to execute
*/
void runInCoordinatorThread(Runnable runnable);

/**
* Reports to JM whether this source is currently processing backlog.
*
* <p>When source is processing backlog, it means the records being emitted by this source is
* already stale and there is no processing latency requirement for these records. This allows
* downstream operators to optimize throughput instead of reducing latency for intermediate
* results.
*
* <p>If no API has been explicitly invoked to specify the backlog status of a source, the
* source is considered to have isProcessingBacklog=false by default.
*/
@PublicEvolving
void setIsProcessingBacklog(boolean isProcessingBacklog);
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ public void runInCoordinatorThread(Runnable runnable) {
mainExecutor.execute(runnable);
}

@Override
public void setIsProcessingBacklog(boolean isProcessingBacklog) {}

public void close() throws Exception {
stoppedAcceptAsyncCalls.set(true);
workerExecutor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public void triggerNonPeriodicScheduledTasks() {
execService.triggerNonPeriodicScheduledTasks();
}

public void triggerNonPeriodicScheduledTasks(Class<?> taskClazz) {
execService.triggerNonPeriodicScheduledTasks(taskClazz);
}

public void triggerPeriodicScheduledTasks() {
execService.triggerPeriodicScheduledTasks();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,18 @@ public class CheckpointCoordinator {
private final CheckpointIDCounter checkpointIdCounter;

/**
* The base checkpoint interval. Actual trigger time may be affected by the max concurrent
* checkpoints and minimum-pause values
* The checkpoint interval when there is no source reporting isProcessingBacklog=true. Actual
* trigger time may be affected by the max concurrent checkpoints, minimum-pause values and
* checkpoint interval during backlog.
*/
private final long baseInterval;

/**
* The checkpoint interval when any source reports isProcessingBacklog=true. Actual trigger time
* may be affected by the max concurrent checkpoints and minimum-pause values.
*/
private final long baseIntervalDuringBacklog;

/** The max time (in ms) that a checkpoint may take. */
private final long checkpointTimeout;

Expand All @@ -180,8 +187,25 @@ public class CheckpointCoordinator {
/** Actor that receives status updates from the execution graph this coordinator works for. */
private JobStatusListener jobStatusListener;

/**
* The current periodic trigger. Used to deduplicate concurrently scheduled checkpoints if any.
*/
@GuardedBy("lock")
private ScheduledTrigger currentPeriodicTrigger;

/** A handle to the current periodic trigger, to cancel it when necessary. */
private ScheduledFuture<?> currentPeriodicTrigger;
@GuardedBy("lock")
private ScheduledFuture<?> currentPeriodicTriggerFuture;

/**
* The timestamp (via {@link Clock#relativeTimeMillis()}) when the next checkpoint will be
* triggered.
*
* <p>If it's value is {@link Long#MAX_VALUE}, it means there is not a next checkpoint
* scheduled.
*/
@GuardedBy("lock")
private long nextCheckpointTriggeringRelativeTime;

/**
* The timestamp (via {@link Clock#relativeTimeMillis()}) when the last checkpoint completed.
Expand Down Expand Up @@ -222,6 +246,10 @@ public class CheckpointCoordinator {

private final CheckpointPlanCalculator checkpointPlanCalculator;

/** IDs of the source operators that are currently processing backlog. */
@GuardedBy("lock")
private final Set<OperatorID> backlogOperators = new HashSet<>();

private boolean baseLocationsForCheckpointInitialized = false;

private boolean forceFullSnapshot;
Expand Down Expand Up @@ -298,6 +326,8 @@ public CheckpointCoordinator(

this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.baseIntervalDuringBacklog = chkConfig.getCheckpointIntervalDuringBacklog();
this.nextCheckpointTriggeringRelativeTime = Long.MAX_VALUE;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.coordinatorsToCheckpoint =
Expand Down Expand Up @@ -426,6 +456,40 @@ public boolean isShutdown() {
return shutdown;
}

/**
* Reports whether a source operator is currently processing backlog.
*
* <p>If any source operator is processing backlog, the checkpoint interval would be decided by
* {@code execution.checkpointing.interval-during-backlog} instead of {@code
* execution.checkpointing.interval}.
*
* <p>If a source has not invoked this method, the source is considered to have
* isProcessingBacklog=false. If a source operator has invoked this method multiple times, the
* last reported value is used.
*
* @param operatorID the operator ID of the source operator.
* @param isProcessingBacklog whether the source operator is processing backlog.
*/
public void setIsProcessingBacklog(OperatorID operatorID, boolean isProcessingBacklog) {
synchronized (lock) {
if (isProcessingBacklog) {
backlogOperators.add(operatorID);
} else {
backlogOperators.remove(operatorID);
}

long currentCheckpointInterval = getCurrentCheckpointInterval();
if (currentCheckpointInterval
!= CheckpointCoordinatorConfiguration.DISABLED_CHECKPOINT_INTERVAL) {
long currentRelativeTime = clock.relativeTimeMillis();
if (currentRelativeTime + currentCheckpointInterval
< nextCheckpointTriggeringRelativeTime) {
rescheduleTrigger(currentRelativeTime, currentCheckpointInterval);
}
}
}
}

// --------------------------------------------------------------------------------------------
// Triggering Checkpoints and Savepoints
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1915,6 +1979,14 @@ public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore;
}

/**
* Gets the checkpoint interval. Its value might vary depending on whether there is processing
* backlog.
*/
private long getCurrentCheckpointInterval() {
return backlogOperators.isEmpty() ? baseInterval : baseIntervalDuringBacklog;
}

public long getCheckpointTimeout() {
return checkpointTimeout;
}
Expand Down Expand Up @@ -1943,7 +2015,7 @@ boolean isCurrentPeriodicTriggerAvailable() {
* @return <code>true</code> if periodic checkpoints have been configured.
*/
public boolean isPeriodicCheckpointingConfigured() {
return baseInterval != Long.MAX_VALUE;
return baseInterval != CheckpointCoordinatorConfiguration.DISABLED_CHECKPOINT_INTERVAL;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -1963,7 +2035,7 @@ public void startCheckpointScheduler() {
stopCheckpointScheduler();

periodicScheduling = true;
currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
scheduleTriggerWithDelay(clock.relativeTimeMillis(), getRandomInitDelay());
}
}

Expand Down Expand Up @@ -2011,25 +2083,29 @@ private void abortPendingCheckpoints(
}
}

private void rescheduleTrigger(long tillNextMillis) {
private void rescheduleTrigger(long currentTimeMillis, long tillNextMillis) {
cancelPeriodicTrigger();
currentPeriodicTrigger = scheduleTriggerWithDelay(tillNextMillis);
scheduleTriggerWithDelay(currentTimeMillis, tillNextMillis);
}

private void cancelPeriodicTrigger() {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
nextCheckpointTriggeringRelativeTime = Long.MAX_VALUE;
currentPeriodicTriggerFuture.cancel(false);
currentPeriodicTrigger = null;
currentPeriodicTriggerFuture = null;
}
}

private long getRandomInitDelay() {
return ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpoints, baseInterval + 1L);
}

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS);
private void scheduleTriggerWithDelay(long currentRelativeTime, long initDelay) {
nextCheckpointTriggeringRelativeTime = currentRelativeTime + initDelay;
currentPeriodicTrigger = new ScheduledTrigger();
currentPeriodicTriggerFuture =
timer.schedule(currentPeriodicTrigger, initDelay, TimeUnit.MILLISECONDS);
}

private void restoreStateToCoordinators(
Expand Down Expand Up @@ -2075,10 +2151,36 @@ public void reportStats(long id, ExecutionAttemptID attemptId, CheckpointMetrics

// ------------------------------------------------------------------------

private final class ScheduledTrigger implements Runnable {
final class ScheduledTrigger implements Runnable {

@Override
public void run() {
synchronized (lock) {
if (currentPeriodicTrigger != this) {
// Another periodic trigger has been scheduled but this one
// has not been force cancelled yet.
return;
}

long checkpointInterval = getCurrentCheckpointInterval();
if (checkpointInterval
!= CheckpointCoordinatorConfiguration.DISABLED_CHECKPOINT_INTERVAL) {
nextCheckpointTriggeringRelativeTime += checkpointInterval;
currentPeriodicTriggerFuture =
timer.schedule(
this,
Math.max(
0,
nextCheckpointTriggeringRelativeTime
- clock.relativeTimeMillis()),
TimeUnit.MILLISECONDS);
} else {
nextCheckpointTriggeringRelativeTime = Long.MAX_VALUE;
currentPeriodicTrigger = null;
currentPeriodicTriggerFuture = null;
}
}

try {
triggerCheckpoint(checkpointProperties, null, true);
} catch (Exception e) {
Expand Down Expand Up @@ -2190,7 +2292,7 @@ private void abortPendingAndQueuedCheckpoints(CheckpointException exception) {
* The canceller of checkpoint. The checkpoint might be cancelled if it doesn't finish in a
* configured period.
*/
private class CheckpointCanceller implements Runnable {
class CheckpointCanceller implements Runnable {

private final PendingCheckpoint pendingCheckpoint;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;

import static java.lang.System.currentTimeMillis;
import static java.lang.System.identityHashCode;
Expand All @@ -58,7 +58,7 @@ class CheckpointRequestDecider {
private static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;

private final int maxConcurrentCheckpointAttempts;
private final LongConsumer rescheduleTrigger;
private final BiConsumer<Long, Long> rescheduleTrigger;
private final Clock clock;
private final long minPauseBetweenCheckpoints;
private final IntSupplier pendingCheckpointsSizeSupplier;
Expand All @@ -69,7 +69,7 @@ class CheckpointRequestDecider {

CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
LongConsumer rescheduleTrigger,
BiConsumer<Long, Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
IntSupplier pendingCheckpointsSizeSupplier,
Expand All @@ -86,7 +86,7 @@ class CheckpointRequestDecider {

CheckpointRequestDecider(
int maxConcurrentCheckpointAttempts,
LongConsumer rescheduleTrigger,
BiConsumer<Long, Long> rescheduleTrigger,
Clock clock,
long minPauseBetweenCheckpoints,
IntSupplier pendingCheckpointsSizeSupplier,
Expand Down Expand Up @@ -165,26 +165,22 @@ private Optional<CheckpointTriggerRequest> chooseRequestToExecute(

CheckpointTriggerRequest first = queuedRequests.first();
if (!first.isForce() && first.isPeriodic) {
long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs);
long currentRelativeTime = clock.relativeTimeMillis();
long nextTriggerDelayMillis =
lastCompletionMs - currentRelativeTime + minPauseBetweenCheckpoints;
if (nextTriggerDelayMillis > 0) {
queuedRequests
.pollFirst()
.completeExceptionally(
new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
rescheduleTrigger.accept(nextTriggerDelayMillis);
rescheduleTrigger.accept(currentRelativeTime, nextTriggerDelayMillis);
return Optional.empty();
}
}

return Optional.of(queuedRequests.pollFirst());
}

private long nextTriggerDelayMillis(long lastCheckpointCompletionRelativeTime) {
return lastCheckpointCompletionRelativeTime
- clock.relativeTimeMillis()
+ minPauseBetweenCheckpoints;
}

@VisibleForTesting
@Deprecated
PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
Expand Down
Loading

0 comments on commit 615e824

Please sign in to comment.