Skip to content

Commit

Permalink
[FLINK-22276][runtime] Fixes the concurrency issue
Browse files Browse the repository at this point in the history
This commit fixes an issue where multiple failures can occur close to each other.
In that case, the DefaultScheduler's restart logic competes for each of these
failures. If multiple failures refer to the same Execution, it might be that the
restart due to one failure handling cleans up the failure already. This leads to
an IllegalArgumentException when archiving the next failure refering to the same
Execution. The issue was that the code relied on ExecutionVertices instead of
Executions.

The new implementation relies on the Executions that were present when the
failure was handled. Therefore, FailureHandlingResultSnapshot is introduced. It
extracts the Execution information from the ExecutionGraph.

Additionally, instead of accessing on ExecutionVertex.getTaskNameWithSubtaskIndex()
to collect the task name, the new implementation relies on
Execution.getVertexWithAttempt(). This enables us to solely rely on the Execution
without an extra dependency on the ExecutionVertex.

The new implementation also removes the add method from RootExceptionHistoryEntry.
This makes the instantiation cleaner. ExceptionHistoryEntryExtractor was replaced
by the factory methods RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot and
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot as part of this effort.

This closes apache#15640.
  • Loading branch information
XComp authored and tillrohrmann committed Apr 19, 2021
1 parent 3cf5f5e commit 317687b
Show file tree
Hide file tree
Showing 12 changed files with 773 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,20 @@ public class ErrorInfo implements Serializable {
*/
public static ErrorInfo createErrorInfoWithNullableCause(
@Nullable Throwable exception, long timestamp) {
return new ErrorInfo(
exception != null
? exception
: new FlinkException(
"Unknown cause for Execution failure (this might be caused by FLINK-21376)."),
timestamp);
return new ErrorInfo(handleMissingThrowable(exception), timestamp);
}

/**
* Utility method to cover FLINK-21376.
*
* @param throwable The actual exception.
* @return a {@link FlinkException} if no exception was passed.
*/
public static Throwable handleMissingThrowable(@Nullable Throwable throwable) {
return throwable != null
? throwable
: new FlinkException(
"Unknown cause for Execution failure (this might be caused by FLINK-21376).");
}

public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
Expand Down Expand Up @@ -251,12 +252,17 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe

final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);

final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
FailureHandlingResultSnapshot.create(
failureHandlingResult,
id -> this.getExecutionVertex(id).getCurrentExecutionAttempt());
delayExecutor.schedule(
() ->
FutureUtils.assertNoException(
cancelFuture.thenRunAsync(
() -> {
archiveFromFailureHandlingResult(failureHandlingResult);
archiveFromFailureHandlingResult(
failureHandlingResultSnapshot);
restartTasks(executionVertexVersions, globalRecovery);
},
getMainThreadExecutor())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand All @@ -75,7 +74,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryExtractor;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
Expand Down Expand Up @@ -148,7 +147,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling

private final ComponentMainThreadExecutor mainThreadExecutor;

private final ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor;
private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;

private final ExecutionGraphFactory executionGraphFactory;
Expand Down Expand Up @@ -215,7 +213,6 @@ public SchedulerBase(
new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);

this.exceptionHistoryEntryExtractor = new ExceptionHistoryEntryExtractor();
this.exceptionHistory =
new BoundedFIFOQueue<>(
jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
Expand Down Expand Up @@ -629,38 +626,42 @@ public CompletableFuture<JobStatus> getJobTerminationFuture() {
return executionGraph.getTerminationFuture();
}

protected final void archiveGlobalFailure(@Nullable Throwable failure) {
archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED));
protected final void archiveGlobalFailure(Throwable failure) {
archiveGlobalFailure(
failure,
executionGraph.getStatusTimestamp(JobStatus.FAILED),
StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), false)
.map(ExecutionVertex::getCurrentExecutionAttempt)
.collect(Collectors.toSet()));
}

private void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) {
private void archiveGlobalFailure(
Throwable failure, long timestamp, Iterable<Execution> executions) {
exceptionHistory.add(
exceptionHistoryEntryExtractor.extractGlobalFailure(
executionGraph.getAllExecutionVertices(), failure, timestamp));
RootExceptionHistoryEntry.fromGlobalFailure(failure, timestamp, executions));
log.debug("Archive global failure.", failure);
}

protected final void archiveFromFailureHandlingResult(
FailureHandlingResult failureHandlingResult) {
if (failureHandlingResult.getExecutionVertexIdOfFailedTask().isPresent()) {
final ExecutionVertexID executionVertexId =
failureHandlingResult.getExecutionVertexIdOfFailedTask().get();
FailureHandlingResultSnapshot failureHandlingResult) {
if (failureHandlingResult.getRootCauseExecution().isPresent()) {
final Execution rootCauseExecution =
failureHandlingResult.getRootCauseExecution().get();

final RootExceptionHistoryEntry rootEntry =
exceptionHistoryEntryExtractor.extractLocalFailure(
executionGraph.getAllVertices(),
executionVertexId,
failureHandlingResult.getVerticesToRestart().stream()
.filter(v -> !executionVertexId.equals(v))
.collect(Collectors.toSet()));
RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(
failureHandlingResult);
exceptionHistory.add(rootEntry);

log.debug(
"Archive local failure causing attempt {} to fail: {}",
executionVertexId,
rootCauseExecution.getAttemptId(),
rootEntry.getExceptionAsString());
} else {
archiveGlobalFailure(
failureHandlingResult.getError(), failureHandlingResult.getTimestamp());
failureHandlingResult.getRootCause(),
failureHandlingResult.getTimestamp(),
failureHandlingResult.getConcurrentlyFailedExecution());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

Expand All @@ -39,7 +42,40 @@ public class ExceptionHistoryEntry extends ErrorInfo {
@Nullable private final String failingTaskName;
@Nullable private final ArchivedTaskManagerLocation taskManagerLocation;

ExceptionHistoryEntry(
/**
* Creates an {@code ExceptionHistoryEntry} based on the provided {@code Execution}.
*
* @param failedExecution the failed {@code Execution}.
* @param taskName the name of the task.
* @return The {@code ExceptionHistoryEntry}.
* @throws IllegalArgumentException if the passed {@code Execution} does not provide a {@link
* Execution#getFailureInfo() failureInfo}.
*/
public static ExceptionHistoryEntry create(AccessExecution failedExecution, String taskName) {
Preconditions.checkArgument(
failedExecution.getFailureInfo().isPresent(),
"The selected Execution " + failedExecution.getAttemptId() + " didn't fail.");

final ErrorInfo failure = failedExecution.getFailureInfo().get();
return new ExceptionHistoryEntry(
failure.getException(),
failure.getTimestamp(),
taskName,
failedExecution.getAssignedResourceLocation());
}

/**
* Instantiates a {@code ExceptionHistoryEntry}.
*
* @param cause The reason for the failure.
* @param timestamp The time the failure was caught.
* @param failingTaskName The name of the task that failed.
* @param taskManagerLocation The host the task was running on.
* @throws NullPointerException if {@code cause} is {@code null}.
* @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code
* 0}.
*/
protected ExceptionHistoryEntry(
Throwable cause,
long timestamp,
@Nullable String failingTaskName,
Expand Down

This file was deleted.

Loading

0 comments on commit 317687b

Please sign in to comment.