Skip to content

Commit

Permalink
[FLINK-28392][runtime] DefaultExecutionDeployer avoid retrieving exec…
Browse files Browse the repository at this point in the history
…utions from ExecutionGraph#currentExecutions

This closes apache#20178.
  • Loading branch information
zhuzhurk committed Jul 6, 2022
1 parent 2bd8c20 commit e5c4e3f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -65,8 +64,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer {

private final Time partitionRegistrationTimeout;

private final Function<ExecutionAttemptID, Execution> executionRetriever;

private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;

private final ComponentMainThreadExecutor mainThreadExecutor;
Expand All @@ -77,7 +74,6 @@ private DefaultExecutionDeployer(
final ExecutionOperations executionOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final Time partitionRegistrationTimeout,
final Function<ExecutionAttemptID, Execution> executionRetriever,
final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
final ComponentMainThreadExecutor mainThreadExecutor) {

Expand All @@ -86,7 +82,6 @@ private DefaultExecutionDeployer(
this.executionOperations = checkNotNull(executionOperations);
this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
this.partitionRegistrationTimeout = checkNotNull(partitionRegistrationTimeout);
this.executionRetriever = checkNotNull(executionRetriever);
this.allocationReservationFunc = checkNotNull(allocationReservationFunc);
this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
}
Expand All @@ -103,7 +98,8 @@ public void allocateSlotsAndDeploy(
allocateSlotsFor(executionsToDeploy);

final List<ExecutionDeploymentHandle> deploymentHandles =
createDeploymentHandles(requiredVersionByVertex, executionSlotAssignments);
createDeploymentHandles(
executionsToDeploy, requiredVersionByVertex, executionSlotAssignments);

waitForAllSlotsAndDeploy(deploymentHandles);
}
Expand Down Expand Up @@ -132,22 +128,25 @@ private List<ExecutionSlotAssignment> allocateSlotsFor(
}

private List<ExecutionDeploymentHandle> createDeploymentHandles(
final List<Execution> executionsToDeploy,
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
final List<ExecutionSlotAssignment> executionSlotAssignments) {

return executionSlotAssignments.stream()
.map(
executionSlotAssignment -> {
final Execution execution =
getExecutionOrThrow(
executionSlotAssignment.getExecutionAttemptId());
final ExecutionVertexID executionVertexId =
execution.getVertex().getID();
return new ExecutionDeploymentHandle(
executionSlotAssignment,
requiredVersionByVertex.get(executionVertexId));
})
.collect(Collectors.toList());
final List<ExecutionDeploymentHandle> deploymentHandles =
new ArrayList<>(executionsToDeploy.size());
for (int i = 0; i < executionsToDeploy.size(); i++) {
final Execution execution = executionsToDeploy.get(i);
final ExecutionSlotAssignment assignment = executionSlotAssignments.get(i);
checkState(execution.getAttemptId().equals(assignment.getExecutionAttemptId()));

final ExecutionVertexID executionVertexId = execution.getVertex().getID();
final ExecutionDeploymentHandle deploymentHandle =
new ExecutionDeploymentHandle(
execution, assignment, requiredVersionByVertex.get(executionVertexId));
deploymentHandles.add(deploymentHandle);
}

return deploymentHandles;
}

private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
Expand All @@ -169,8 +168,7 @@ private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
(ignore, throwable) -> {
if (throwable != null) {
handleTaskDeploymentFailure(
deploymentHandle.getExecutionAttemptId(),
throwable);
deploymentHandle.getExecution(), throwable);
}
return null;
});
Expand Down Expand Up @@ -208,11 +206,9 @@ private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
return (logicalSlot, throwable) -> {
final ExecutionVertexVersion requiredVertexVersion =
deploymentHandle.getRequiredVertexVersion();
final Optional<Execution> optionalExecution =
getExecution(deploymentHandle.getExecutionAttemptId());
final Execution execution = deploymentHandle.getExecution();

if (!optionalExecution.isPresent()
|| optionalExecution.get().getState() != ExecutionState.SCHEDULED
if (execution.getState() != ExecutionState.SCHEDULED
|| executionVertexVersioner.isModified(requiredVertexVersion)) {
if (throwable == null) {
log.debug(
Expand All @@ -231,7 +227,6 @@ private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
}

final Execution execution = optionalExecution.get();
if (!execution.tryAssignResource(logicalSlot)) {
throw new IllegalStateException(
"Could not assign resource "
Expand Down Expand Up @@ -277,8 +272,7 @@ private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartition
// a null logicalSlot means the slot assignment is skipped, in which case
// the produced partition registration process can be skipped as well
if (logicalSlot != null) {
final Execution execution =
getExecutionOrThrow(deploymentHandle.getExecutionAttemptId());
final Execution execution = deploymentHandle.getExecution();
final CompletableFuture<Void> partitionRegistrationFuture =
execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());

Expand All @@ -299,11 +293,9 @@ private BiFunction<Object, Throwable, Void> deployOrHandleError(
return (ignored, throwable) -> {
final ExecutionVertexVersion requiredVertexVersion =
deploymentHandle.getRequiredVertexVersion();
final Optional<Execution> optionalExecution =
getExecution(deploymentHandle.getExecutionAttemptId());
final Execution execution = deploymentHandle.getExecution();

if (!optionalExecution.isPresent()
|| optionalExecution.get().getState() != ExecutionState.SCHEDULED
if (execution.getState() != ExecutionState.SCHEDULED
|| executionVertexVersioner.isModified(requiredVertexVersion)) {
if (throwable == null) {
log.debug(
Expand All @@ -314,11 +306,10 @@ private BiFunction<Object, Throwable, Void> deployOrHandleError(
return null;
}

final Execution execution = optionalExecution.get();
if (throwable == null) {
deployTaskSafe(execution);
} else {
handleTaskDeploymentFailure(execution.getAttemptId(), throwable);
handleTaskDeploymentFailure(execution, throwable);
}
return null;
};
Expand All @@ -328,40 +319,37 @@ private void deployTaskSafe(final Execution execution) {
try {
executionOperations.deploy(execution);
} catch (Throwable e) {
handleTaskDeploymentFailure(execution.getAttemptId(), e);
handleTaskDeploymentFailure(execution, e);
}
}

private void handleTaskDeploymentFailure(
final ExecutionAttemptID executionAttemptId, final Throwable error) {

final Execution execution = getExecutionOrThrow(executionAttemptId);
private void handleTaskDeploymentFailure(final Execution execution, final Throwable error) {
executionOperations.markFailed(execution, error);
}

private Execution getExecutionOrThrow(ExecutionAttemptID executionAttemptId) {
return getExecution(executionAttemptId).get();
}

private Optional<Execution> getExecution(ExecutionAttemptID executionAttemptId) {
return Optional.ofNullable(executionRetriever.apply(executionAttemptId));
}

private static class ExecutionDeploymentHandle {

private final Execution execution;

private final ExecutionSlotAssignment executionSlotAssignment;

private final ExecutionVertexVersion requiredVertexVersion;

ExecutionDeploymentHandle(
ExecutionSlotAssignment executionSlotAssignment,
final Execution execution,
final ExecutionSlotAssignment executionSlotAssignment,
final ExecutionVertexVersion requiredVertexVersion) {
this.execution = checkNotNull(execution);
this.executionSlotAssignment = checkNotNull(executionSlotAssignment);
this.requiredVertexVersion = checkNotNull(requiredVertexVersion);
}

Execution getExecution() {
return execution;
}

ExecutionAttemptID getExecutionAttemptId() {
return executionSlotAssignment.getExecutionAttemptId();
return execution.getAttemptId();
}

CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
Expand All @@ -383,7 +371,6 @@ public DefaultExecutionDeployer createInstance(
ExecutionOperations executionOperations,
ExecutionVertexVersioner executionVertexVersioner,
Time partitionRegistrationTimeout,
Function<ExecutionAttemptID, Execution> executionRetriever,
BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
ComponentMainThreadExecutor mainThreadExecutor) {
return new DefaultExecutionDeployer(
Expand All @@ -392,7 +379,6 @@ public DefaultExecutionDeployer createInstance(
executionOperations,
executionVertexVersioner,
partitionRegistrationTimeout,
executionRetriever,
allocationReservationFunc,
mainThreadExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ protected DefaultScheduler(
executionOperations,
executionVertexVersioner,
rpcTimeout,
id -> getExecutionGraph().getRegisteredExecutions().get(id),
this::startReserveAllocation,
mainThreadExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import org.slf4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;

/** This deployer is responsible for deploying executions. */
interface ExecutionDeployer {
Expand Down Expand Up @@ -63,7 +61,6 @@ interface Factory {
* @param executionVertexVersioner the versioner which records the versions of execution
* vertices.
* @param partitionRegistrationTimeout timeout of partition registration
* @param executionRetriever retriever to get executions
* @param allocationReservationFunc function to reserve allocations for local recovery
* @param mainThreadExecutor the main thread executor
* @return an instantiated {@link ExecutionDeployer}
Expand All @@ -74,7 +71,6 @@ ExecutionDeployer createInstance(
final ExecutionOperations executionOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final Time partitionRegistrationTimeout,
final Function<ExecutionAttemptID, Execution> executionRetriever,
final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
final ComponentMainThreadExecutor mainThreadExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void tearDown() {
void testDeployTasks() throws Exception {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

deployTasks(executionDeployer, executionGraph);

Expand All @@ -120,7 +120,7 @@ void testDeployTasks() throws Exception {
void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
final JobGraph jobGraph = singleJobVertexJobGraph(4);
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

testExecutionSlotAllocator.disableAutoCompletePendingRequests();

Expand All @@ -143,7 +143,7 @@ void testDeploymentFailures() throws Exception {
testExecutionOperations.enableFailDeploy();

final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();
deployTasks(executionDeployer, executionGraph);

assertThat(testExecutionOperations.getFailedExecutions())
Expand All @@ -157,7 +157,7 @@ void testSlotAllocationTimeout() throws Exception {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();

final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();
deployTasks(executionDeployer, executionGraph);

assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
Expand All @@ -175,7 +175,7 @@ void testSkipDeploymentIfVertexVersionOutdated() throws Exception {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();

final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();
deployTasks(executionDeployer, executionGraph);

final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
Expand All @@ -193,7 +193,7 @@ void testReleaseSlotIfVertexVersionOutdated() throws Exception {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();

final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();
deployTasks(executionDeployer, executionGraph);

final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
Expand All @@ -208,7 +208,7 @@ void testReleaseSlotIfVertexVersionOutdated() throws Exception {
void testDeployOnlyIfVertexIsCreated() throws Exception {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

// deploy once to transition the tasks out from CREATED state
deployTasks(executionDeployer, executionGraph);
Expand All @@ -233,7 +233,7 @@ void testDeploymentWaitForProducedPartitionRegistration() throws Exception {

final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

deployTasks(executionDeployer, executionGraph);

Expand All @@ -251,7 +251,7 @@ void testFailedProducedPartitionRegistration() throws Exception {

final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

deployTasks(executionDeployer, executionGraph);

Expand All @@ -267,7 +267,7 @@ void testDirectExceptionOnProducedPartitionRegistration() throws Exception {

final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

deployTasks(executionDeployer, executionGraph);

Expand All @@ -289,7 +289,7 @@ void testProducedPartitionRegistrationTimeout() throws Exception {

final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
final ExecutionDeployer executionDeployer = createExecutionDeployer();

deployTasks(executionDeployer, executionGraph);

Expand Down Expand Up @@ -339,15 +339,14 @@ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exce
return executionGraph;
}

private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) {
private ExecutionDeployer createExecutionDeployer() {
return new DefaultExecutionDeployer.Factory()
.createInstance(
LoggerFactory.getLogger(DefaultExecutionDeployer.class),
testExecutionSlotAllocator,
testExecutionOperations,
executionVertexVersioner,
partitionRegistrationTimeout,
id -> executionGraph.getRegisteredExecutions().get(id),
(ignored1, ignored2) -> {},
mainThreadExecutor);
}
Expand Down

0 comments on commit e5c4e3f

Please sign in to comment.