Skip to content

Commit

Permalink
[FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to acce…
Browse files Browse the repository at this point in the history
…pt an execution to deploy

This helps to decouple the task deployment from ExecutionVertex#getCurrentExecutionAttempt().
  • Loading branch information
zhuzhurk committed Jul 1, 2022
1 parent b96d476 commit e73225e
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ private MaybeOffloaded<ShuffleDescriptor[]> serializeAndTryOffloadShuffleDescrip
}
}

public static TaskDeploymentDescriptorFactory fromExecutionVertex(
ExecutionVertex executionVertex)
public static TaskDeploymentDescriptorFactory fromExecution(Execution execution)
throws IOException, CachedIntermediateDataSetCorruptedException {
InternalExecutionGraphAccessor internalExecutionGraphAccessor =
final ExecutionVertex executionVertex = execution.getVertex();
final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors;
try {
Expand All @@ -272,7 +272,7 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex(
}

return new TaskDeploymentDescriptorFactory(
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
execution.getAttemptId(),
getSerializedJobInformation(internalExecutionGraphAccessor),
getSerializedTaskInformation(
executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
Expand Down Expand Up @@ -338,7 +338,7 @@ private static MaybeOffloaded<TaskInformation> getSerializedTaskInformation(
public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
IntermediateResultPartition consumedPartition,
PartitionLocationConstraint partitionDeploymentConstraint) {
Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
Execution producer = consumedPartition.getProducer().getPartitionProducer();

ExecutionState producerState = producer.getState();
Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,13 @@ public void deploy() throws JobException {
"Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}",
vertex.getTaskNameWithSubtaskIndex(),
getAttemptNumber(),
vertex.getCurrentExecutionAttempt().getAttemptId(),
attemptId,
vertex.getID(),
getAssignedResourceLocation(),
slot.getAllocationId());

final TaskDeploymentDescriptor deployment =
TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex)
TaskDeploymentDescriptorFactory.fromExecution(this)
.createDeploymentDescriptor(
slot.getAllocationId(),
taskRestore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public static BlockingResultInfo createFromIntermediateResult(
for (IntermediateResultPartition partition : intermediateResult.getPartitions()) {
checkState(partition.isConsumable());

IOMetrics ioMetrics =
partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
IOMetrics ioMetrics = partition.getProducer().getPartitionProducer().getIOMetrics();
checkNotNull(ioMetrics, "IOMetrics should not be null.");

blockingPartitionSizes.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private static ExecutionGraph createExecutionGraph(
private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev)
throws IOException, CachedIntermediateDataSetCorruptedException {

return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
return TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt())
.createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList());
}

Expand Down

0 comments on commit e73225e

Please sign in to comment.