diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java index 6da8f4fabca7e..21f5ef6f0696d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java @@ -253,10 +253,10 @@ private MaybeOffloaded serializeAndTryOffloadShuffleDescrip } } - public static TaskDeploymentDescriptorFactory fromExecutionVertex( - ExecutionVertex executionVertex) + public static TaskDeploymentDescriptorFactory fromExecution(Execution execution) throws IOException, CachedIntermediateDataSetCorruptedException { - InternalExecutionGraphAccessor internalExecutionGraphAccessor = + final ExecutionVertex executionVertex = execution.getVertex(); + final InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor(); Map clusterPartitionShuffleDescriptors; try { @@ -272,7 +272,7 @@ public static TaskDeploymentDescriptorFactory fromExecutionVertex( } return new TaskDeploymentDescriptorFactory( - executionVertex.getCurrentExecutionAttempt().getAttemptId(), + execution.getAttemptId(), getSerializedJobInformation(internalExecutionGraphAccessor), getSerializedTaskInformation( executionVertex.getJobVertex().getTaskInformationOrBlobKey()), @@ -338,7 +338,7 @@ private static MaybeOffloaded getSerializedTaskInformation( public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor( IntermediateResultPartition consumedPartition, PartitionLocationConstraint partitionDeploymentConstraint) { - Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt(); + Execution producer = consumedPartition.getProducer().getPartitionProducer(); ExecutionState producerState = producer.getState(); Optional consumedPartitionDescriptor = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 459d9861980a1..96c0d3c119ae7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -564,13 +564,13 @@ public void deploy() throws JobException { "Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(), getAttemptNumber(), - vertex.getCurrentExecutionAttempt().getAttemptId(), + attemptId, vertex.getID(), getAssignedResourceLocation(), slot.getAllocationId()); final TaskDeploymentDescriptor deployment = - TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex) + TaskDeploymentDescriptorFactory.fromExecution(this) .createDeploymentDescriptor( slot.getAllocationId(), taskRestore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java index 11a45a6d7a81a..302980ab21604 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java @@ -68,8 +68,7 @@ public static BlockingResultInfo createFromIntermediateResult( for (IntermediateResultPartition partition : intermediateResult.getPartitions()) { checkState(partition.isConsumable()); - IOMetrics ioMetrics = - partition.getProducer().getCurrentExecutionAttempt().getIOMetrics(); + IOMetrics ioMetrics = partition.getProducer().getPartitionProducer().getIOMetrics(); checkNotNull(ioMetrics, "IOMetrics should not be null."); blockingPartitionSizes.add( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java index 7dba9971c567f..1074da2d340b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java @@ -164,7 +164,7 @@ private static ExecutionGraph createExecutionGraph( private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) throws IOException, CachedIntermediateDataSetCorruptedException { - return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev) + return TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt()) .createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList()); }