Skip to content

Commit

Permalink
[FLINK-19286][runtime] Improve region vertex sorting performance
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk committed Sep 23, 2020
1 parent 6045da0 commit f89c137
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.util.IterableUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -53,6 +55,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {

private final Map<IntermediateResultPartitionID, Set<SchedulingPipelinedRegion>> partitionConsumerRegions = new HashMap<>();

private final Map<SchedulingPipelinedRegion, List<ExecutionVertexID>> regionVerticesSorted = new IdentityHashMap<>();

public PipelinedRegionSchedulingStrategy(
final SchedulerOperations schedulerOperations,
final SchedulingTopology schedulingTopology) {
Expand All @@ -72,6 +76,11 @@ private void init() {
correlatedResultPartitions.computeIfAbsent(partition.getResultId(), rid -> new HashSet<>()).add(partition);
}
}

for (SchedulingExecutionVertex vertex : schedulingTopology.getVertices()) {
final SchedulingPipelinedRegion region = schedulingTopology.getPipelinedRegionOfVertex(vertex.getId());
regionVerticesSorted.computeIfAbsent(region, r -> new ArrayList<>()).add(vertex.getId());
}
}

@Override
Expand Down Expand Up @@ -127,13 +136,9 @@ private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {

checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");

final Set<ExecutionVertexID> verticesToSchedule = IterableUtils.toStream(region.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet());
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(
schedulingTopology,
verticesToSchedule,
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region),
id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.util.IterableUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -61,6 +63,20 @@ static List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOpti
.collect(Collectors.toList());
}

static List<ExecutionVertexDeploymentOption> createExecutionVertexDeploymentOptions(
final Collection<ExecutionVertexID> verticesToDeploy,
final Function<ExecutionVertexID, DeploymentOption> deploymentOptionRetriever) {

final List<ExecutionVertexDeploymentOption> deploymentOptions = new ArrayList<>(verticesToDeploy.size());
for (ExecutionVertexID executionVertexId : verticesToDeploy) {
final ExecutionVertexDeploymentOption deploymentOption = new ExecutionVertexDeploymentOption(
executionVertexId,
deploymentOptionRetriever.apply(executionVertexId));
deploymentOptions.add(deploymentOption);
}
return deploymentOptions;
}

static List<SchedulingPipelinedRegion> sortPipelinedRegionsInTopologicalOrder(
final SchedulingTopology topology,
final Set<SchedulingPipelinedRegion> regions) {
Expand Down

0 comments on commit f89c137

Please sign in to comment.