Skip to content

Commit

Permalink
[FLINK-33565][Scheduler] ConcurrentExceptions works with exception me…
Browse files Browse the repository at this point in the history
…rging
  • Loading branch information
1996fanrui committed Jan 23, 2024
1 parent bb437bd commit 9f20bc4
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ private FailureHandlingResult handleFailure(
new JobException("The failure is not recoverable", cause),
timestamp,
failureLabels,
globalFailure);
globalFailure,
true);
}

restartBackoffTimeStrategy.notifyFailure(cause);
boolean isNewAttempt = restartBackoffTimeStrategy.notifyFailure(cause);
if (restartBackoffTimeStrategy.canRestart()) {
numberOfRestarts++;

Expand All @@ -171,15 +172,17 @@ private FailureHandlingResult handleFailure(
failureLabels,
verticesToRestart,
restartBackoffTimeStrategy.getBackoffTime(),
globalFailure);
globalFailure,
isNewAttempt);
} else {
return FailureHandlingResult.unrecoverable(
failedExecution,
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
timestamp,
failureLabels,
globalFailure);
globalFailure,
isNewAttempt);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class FailureHandlingResult {
/** True if the original failure was a global failure. */
private final boolean globalFailure;

/** True if current failure is the root cause instead of concurrent exceptions. */
private final boolean isRootCause;

/**
* Creates a result of a set of tasks to restart to recover from the failure.
*
Expand All @@ -77,6 +80,7 @@ public class FailureHandlingResult {
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
* @param isRootCause indicate whether current failure is a new attempt.
*/
private FailureHandlingResult(
@Nullable Execution failedExecution,
Expand All @@ -85,7 +89,8 @@ private FailureHandlingResult(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
boolean globalFailure,
boolean isRootCause) {
checkState(restartDelayMS >= 0);

this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
Expand All @@ -95,6 +100,7 @@ private FailureHandlingResult(
this.failureLabels = failureLabels;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
this.isRootCause = isRootCause;
}

/**
Expand All @@ -106,13 +112,16 @@ private FailureHandlingResult(
* @param timestamp the time the failure was handled.
* @param failureLabels collection of tags characterizing the failure as produced by the
* FailureEnrichers
* @param isRootCause indicate whether current failure is a new attempt.
*/
private FailureHandlingResult(
@Nullable Execution failedExecution,
@Nonnull Throwable error,
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
boolean globalFailure) {
boolean globalFailure,
boolean isRootCause) {
this.isRootCause = isRootCause;
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.failedExecution = failedExecution;
Expand Down Expand Up @@ -206,6 +215,16 @@ public boolean isGlobalFailure() {
return globalFailure;
}

/**
* @return True means that the current failure is a new attempt, false means that there has been
* a failure before and has not been tried yet, and the current failure will be merged into
* the previous attempt, and these merged exceptions will be considered as the concurrent
* exceptions.
*/
public boolean isRootCause() {
return isRootCause;
}

/**
* Creates a result of a set of tasks to restart to recover from the failure.
*
Expand All @@ -230,15 +249,17 @@ public static FailureHandlingResult restartable(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
boolean globalFailure,
boolean isRootCause) {
return new FailureHandlingResult(
failedExecution,
cause,
timestamp,
failureLabels,
verticesToRestart,
restartDelayMS,
globalFailure);
globalFailure,
isRootCause);
}

/**
Expand All @@ -260,8 +281,9 @@ public static FailureHandlingResult unrecoverable(
@Nonnull Throwable error,
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
boolean globalFailure) {
boolean globalFailure,
boolean isRootCause) {
return new FailureHandlingResult(
failedExecution, error, timestamp, failureLabels, globalFailure);
failedExecution, error, timestamp, failureLabels, globalFailure, isRootCause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -128,6 +129,7 @@
import java.util.stream.StreamSupport;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/** Base class which can be used to implement {@link SchedulerNG}. */
public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling {
Expand Down Expand Up @@ -166,6 +168,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling

private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;

private RootExceptionHistoryEntry latestRootExceptionEntry;

private final ExecutionGraphFactory executionGraphFactory;

private final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings;
Expand Down Expand Up @@ -707,27 +711,40 @@ private void archiveGlobalFailure(
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Iterable<Execution> executions) {
exceptionHistory.add(
latestRootExceptionEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
failure, timestamp, failureLabels, executions));
failure, timestamp, failureLabels, executions);
exceptionHistory.add(latestRootExceptionEntry);
log.debug("Archive global failure.", failure);
}

protected final void archiveFromFailureHandlingResult(
FailureHandlingResultSnapshot failureHandlingResult) {
if (failureHandlingResult.getRootCauseExecution().isPresent()) {
if (!failureHandlingResult.isRootCause()) {
// Handle all subsequent exceptions as the concurrent exceptions when it's not a new
// attempt.
checkState(
latestRootExceptionEntry != null,
"A root exception entry should exist if failureHandlingResult wasn't "
+ "generated as part of a new error handling cycle.");
List<Execution> concurrentlyExecutions = new ArrayList<>();
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());

latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions);
} else if (failureHandlingResult.getRootCauseExecution().isPresent()) {
final Execution rootCauseExecution =
failureHandlingResult.getRootCauseExecution().get();

final RootExceptionHistoryEntry rootEntry =
latestRootExceptionEntry =
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
failureHandlingResult);
exceptionHistory.add(rootEntry);
exceptionHistory.add(latestRootExceptionEntry);

log.debug(
"Archive local failure causing attempt {} to fail: {}",
rootCauseExecution.getAttemptId(),
rootEntry.getExceptionAsString());
latestRootExceptionEntry.getExceptionAsString());
} else {
archiveGlobalFailure(
failureHandlingResult.getRootCause(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class FailureHandlingResultSnapshot {
private final CompletableFuture<Map<String, String>> failureLabels;
private final long timestamp;
private final Set<Execution> concurrentlyFailedExecutions;
private final boolean isRootCause;

/**
* Creates a {@code FailureHandlingResultSnapshot} based on the passed {@link
Expand Down Expand Up @@ -84,7 +85,8 @@ public static FailureHandlingResultSnapshot create(
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),
failureHandlingResult.getTimestamp(),
failureHandlingResult.getFailureLabels(),
concurrentlyFailedExecutions);
concurrentlyFailedExecutions,
failureHandlingResult.isRootCause());
}

@VisibleForTesting
Expand All @@ -93,7 +95,8 @@ public static FailureHandlingResultSnapshot create(
Throwable rootCause,
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Set<Execution> concurrentlyFailedExecutions) {
Set<Execution> concurrentlyFailedExecutions,
boolean isRootCause) {
Preconditions.checkArgument(
rootCauseExecution == null
|| !concurrentlyFailedExecutions.contains(rootCauseExecution),
Expand All @@ -105,6 +108,7 @@ public static FailureHandlingResultSnapshot create(
this.timestamp = timestamp;
this.concurrentlyFailedExecutions =
Preconditions.checkNotNull(concurrentlyFailedExecutions);
this.isRootCause = isRootCause;
}

/**
Expand Down Expand Up @@ -150,7 +154,17 @@ public long getTimestamp() {
*
* @return The concurrently failed {@code Executions}.
*/
public Iterable<Execution> getConcurrentlyFailedExecution() {
public Set<Execution> getConcurrentlyFailedExecution() {
return Collections.unmodifiableSet(concurrentlyFailedExecutions);
}

/**
* @return True means that the current failure is a new attempt, false means that there has been
* a failure before and has not been tried yet, and the current failure will be merged into
* the previous attempt, and these merged exceptions will be considered as the concurrent
* exceptions.
*/
public boolean isRootCause() {
return isRootCause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,11 +39,12 @@
* {@code RootExceptionHistoryEntry} extending {@link ExceptionHistoryEntry} by providing a list of
* {@code ExceptionHistoryEntry} instances to store concurrently caught failures.
*/
@NotThreadSafe
public class RootExceptionHistoryEntry extends ExceptionHistoryEntry {

private static final long serialVersionUID = -7647332765867297434L;

private final Iterable<ExceptionHistoryEntry> concurrentExceptions;
private final Collection<ExceptionHistoryEntry> concurrentExceptions;

/**
* Creates a {@code RootExceptionHistoryEntry} based on the passed {@link
Expand Down Expand Up @@ -96,7 +99,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
}

public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> entries) {
ExceptionHistoryEntry entry, Collection<ExceptionHistoryEntry> entries) {
return new RootExceptionHistoryEntry(
entry.getException(),
entry.getTimestamp(),
Expand Down Expand Up @@ -140,15 +143,20 @@ private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
failureLabels,
failingTaskName,
taskManagerLocation,
StreamSupport.stream(executions.spliterator(), false)
.filter(execution -> execution.getFailureInfo().isPresent())
.map(
execution ->
ExceptionHistoryEntry.create(
execution,
execution.getVertexWithAttempt(),
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
.collect(Collectors.toList()));
createExceptionHistoryEntries(executions));
}

private static Collection<ExceptionHistoryEntry> createExceptionHistoryEntries(
Iterable<Execution> executions) {
return StreamSupport.stream(executions.spliterator(), false)
.filter(execution -> execution.getFailureInfo().isPresent())
.map(
execution ->
ExceptionHistoryEntry.create(
execution,
execution.getVertexWithAttempt(),
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
.collect(Collectors.toList());
}

/**
Expand All @@ -170,11 +178,15 @@ public RootExceptionHistoryEntry(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
Iterable<ExceptionHistoryEntry> concurrentExceptions) {
Collection<ExceptionHistoryEntry> concurrentExceptions) {
super(cause, timestamp, failureLabels, failingTaskName, taskManagerLocation);
this.concurrentExceptions = concurrentExceptions;
}

public void addConcurrentExceptions(Iterable<Execution> concurrentlyExecutions) {
this.concurrentExceptions.addAll(createExceptionHistoryEntries(concurrentlyExecutions));
}

public Iterable<ExceptionHistoryEntry> getConcurrentExceptions() {
return concurrentExceptions;
}
Expand Down
Loading

0 comments on commit 9f20bc4

Please sign in to comment.