Skip to content

Commit

Permalink
[FLINK-35465][runtime] Introduce BatchJobRecoveryHandler for recovery…
Browse files Browse the repository at this point in the history
… of batch jobs from JobMaster failures.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed May 28, 2024
1 parent 3206659 commit e964af3
Show file tree
Hide file tree
Showing 19 changed files with 3,429 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.SupportsBatchSnapshot;

import javax.annotation.Nullable;

Expand All @@ -38,7 +39,8 @@
import java.util.TreeSet;

/** A mock {@link SplitEnumerator} for unit tests. */
public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> {
public class MockSplitEnumerator
implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>, SupportsBatchSnapshot {
private final SortedSet<MockSourceSplit> unassignedSplits;
private final SplitEnumeratorContext<MockSourceSplit> enumContext;
private final List<SourceEvent> handledSourceEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,8 @@ private void releasePartitionGroups(
}
}

ResultPartitionID createResultPartitionId(
@VisibleForTesting
public ResultPartitionID createResultPartitionId(
final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition schedulingResultPartition =
getSchedulingTopology().getResultPartition(resultPartitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public class Execution
private final ExecutionVertex vertex;

/** The unique ID marking the specific execution instant of the task. */
private final ExecutionAttemptID attemptId;
private ExecutionAttemptID attemptId;

/**
* The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}.
Expand Down Expand Up @@ -453,6 +453,40 @@ public CompletableFuture<Void> registerProducedPartitions(TaskManagerLocation lo
});
}

private void recoverAttempt(ExecutionAttemptID newId) {
if (!this.attemptId.equals(newId)) {
getVertex().getExecutionGraphAccessor().deregisterExecution(this);
this.attemptId = newId;
getVertex().getExecutionGraphAccessor().registerExecution(this);
}
}

/** Recover the execution attempt status after JM failover. */
public void recoverExecution(
ExecutionAttemptID attemptId,
TaskManagerLocation location,
Map<String, Accumulator<?, ?>> userAccumulators,
IOMetrics metrics) {
recoverAttempt(attemptId);
taskManagerLocationFuture.complete(location);

try {
transitionState(this.state, FINISHED);
finishPartitionsAndUpdateConsumers();
updateAccumulatorsAndMetrics(userAccumulators, metrics);
releaseAssignedResource(null);
vertex.getExecutionGraphAccessor().deregisterExecution(this);
} finally {
vertex.executionFinished(this);
}
}

public void recoverProducedPartitions(
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>
producedPartitions) {
this.producedPartitions = checkNotNull(producedPartitions);
}

private static CompletableFuture<
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>>
registerProducedPartitions(
Expand All @@ -469,7 +503,6 @@ public CompletableFuture<Void> registerProducedPartitions(TaskManagerLocation lo

for (IntermediateResultPartition partition : partitions) {
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
int maxParallelism = getPartitionMaxParallelism(partition);
CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture =
vertex.getExecutionGraphAccessor()
.getShuffleMaster()
Expand All @@ -479,10 +512,8 @@ public CompletableFuture<Void> registerProducedPartitions(TaskManagerLocation lo
CompletableFuture<ResultPartitionDeploymentDescriptor> partitionRegistration =
shuffleDescriptorFuture.thenApply(
shuffleDescriptor ->
new ResultPartitionDeploymentDescriptor(
partitionDescriptor,
shuffleDescriptor,
maxParallelism));
createResultPartitionDeploymentDescriptor(
partitionDescriptor, partition, shuffleDescriptor));
partitionRegistrations.add(partitionRegistration);
}

Expand All @@ -503,6 +534,21 @@ private static int getPartitionMaxParallelism(IntermediateResultPartition partit
return partition.getIntermediateResult().getConsumersMaxParallelism();
}

public static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
IntermediateResultPartition partition, ShuffleDescriptor shuffleDescriptor) {
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
return createResultPartitionDeploymentDescriptor(
partitionDescriptor, partition, shuffleDescriptor);
}

private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
IntermediateResultPartition partition,
ShuffleDescriptor shuffleDescriptor) {
return new ResultPartitionDeploymentDescriptor(
partitionDescriptor, shuffleDescriptor, getPartitionMaxParallelism(partition));
}

/**
* Deploys the execution to the previously assigned resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio

private final ScheduledExecutor delayExecutor;

private final SchedulingStrategy schedulingStrategy;
protected final SchedulingStrategy schedulingStrategy;

private final ExecutionOperations executionOperations;

private final Set<ExecutionVertexID> verticesWaitingForRestart;

private final ShuffleMaster<?> shuffleMaster;
protected final ShuffleMaster<?> shuffleMaster;

private final Map<AllocationID, Long> reservedAllocationRefCounters;

Expand All @@ -109,6 +109,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio

protected final ExecutionDeployer executionDeployer;

protected final FailoverStrategy failoverStrategy;

protected DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
Expand Down Expand Up @@ -162,7 +164,7 @@ protected DefaultScheduler(
this.reservedAllocationRefCounters = new HashMap<>();
this.reservedAllocationByExecutionVertex = new HashMap<>();

final FailoverStrategy failoverStrategy =
this.failoverStrategy =
failoverStrategyFactory.create(
getSchedulingTopology(), getResultPartitionAvailabilityChecker());
log.info(
Expand Down Expand Up @@ -301,7 +303,7 @@ private Throwable maybeTranslateToClusterDatasetException(
cause, Collections.singletonList(failedPartitionId.getIntermediateDataSetID()));
}

private void notifyCoordinatorsAboutTaskFailure(
protected void notifyCoordinatorsAboutTaskFailure(
final Execution execution, @Nullable final Throwable error) {
final ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
final int subtaskIndex = execution.getParallelSubtaskIndex();
Expand All @@ -323,7 +325,7 @@ public void handleGlobalFailure(final Throwable error) {
maybeRestartTasks(failureHandlingResult);
}

private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
protected void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
if (failureHandlingResult.canRestart()) {
restartTasksWithDelay(failureHandlingResult);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,7 @@ protected final void archiveFromFailureHandlingResult(
}

@Override
public final boolean updateTaskExecutionState(
final TaskExecutionStateTransition taskExecutionState) {
public boolean updateTaskExecutionState(final TaskExecutionStateTransition taskExecutionState) {

final ExecutionAttemptID attemptId = taskExecutionState.getID();
final Execution execution = executionGraph.getRegisteredExecutions().get(attemptId);
Expand Down Expand Up @@ -1142,7 +1141,7 @@ public void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
// ------------------------------------------------------------------------

@VisibleForTesting
JobID getJobId() {
protected JobID getJobId() {
return jobGraph.getJobID();
}

Expand Down
Loading

0 comments on commit e964af3

Please sign in to comment.