Skip to content

Commit

Permalink
[FLINK-17016][runtime] Enable to use pipelined region scheduling stra…
Browse files Browse the repository at this point in the history
…tegy

It can be enabled via config option "jobmanager.scheduler.scheduling-strategy=region"
  • Loading branch information
zhuzhurk committed Sep 11, 2020
1 parent d918cc3 commit 4df2295
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public class JobManagerOptions {
// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");

/**
* Config parameter determining the scheduler implementation.
*/
Expand All @@ -339,6 +340,23 @@ public class JobManagerOptions {
.list(
text("'ng': new generation scheduler"))
.build());

/**
* Config parameter determining the scheduling strategy.
*/
@Documentation.ExcludeFromDocumentation("User normally should not be expected to change this config.")
public static final ConfigOption<String> SCHEDULING_STRATEGY =
key("jobmanager.scheduler.scheduling-strategy")
.stringType()
.defaultValue("legacy")
.withDescription(Description.builder()
.text("Determines which scheduling strategy is used to schedule tasks. Accepted values are:")
.list(
text("'region': pipelined region scheduling"),
text("'legacy': legacy scheduling strategy, which is eager scheduling for streaming jobs " +
"and lazy from sources scheduling for batch jobs"))
.build());

/**
* Config parameter controlling whether partitions should already be released during the job execution.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand All @@ -38,7 +40,9 @@
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
Expand All @@ -48,8 +52,10 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -161,14 +167,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
restartBackoffTimeStrategy);
this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());

final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
getPreferredLocationsRetriever(),
executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
getSchedulingTopology(),
() -> getJobGraph().getSlotSharingGroups(),
() -> getJobGraph().getCoLocationGroupDescriptors());
this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory)
.createInstance(new DefaultExecutionSlotAllocationContext());

this.verticesWaitingForRestart = new HashSet<>();
this.startUpAction = startUpAction;
Expand Down Expand Up @@ -513,4 +513,39 @@ private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
}
}

private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {

@Override
public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
final ExecutionVertexID executionVertexId,
final Set<ExecutionVertexID> producersToIgnore) {
return getPreferredLocationsRetriever().getPreferredLocations(executionVertexId, producersToIgnore);
}

@Override
public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).getResourceProfile();
}

@Override
public AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).getLatestPriorAllocation();
}

@Override
public SchedulingTopology getSchedulingTopology() {
return DefaultScheduler.this.getSchedulingTopology();
}

@Override
public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
return getJobGraph().getSlotSharingGroups();
}

@Override
public Set<CoLocationGroupDesc> getCoLocationGroups() {
return getJobGraph().getCoLocationGroupDescriptors();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.util.clock.SystemClock;

import java.util.function.Consumer;

/**
* Components to create a {@link DefaultScheduler} which depends on the
* configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
*/
public class DefaultSchedulerComponents {

private static final String PIPELINED_REGION_SCHEDULING = "region";
private static final String LEGACY_SCHEDULING = "legacy";

private final SchedulingStrategyFactory schedulingStrategyFactory;
private final Consumer<ComponentMainThreadExecutor> startUpAction;
private final ExecutionSlotAllocatorFactory allocatorFactory;

private DefaultSchedulerComponents(
final SchedulingStrategyFactory schedulingStrategyFactory,
final Consumer<ComponentMainThreadExecutor> startUpAction,
final ExecutionSlotAllocatorFactory allocatorFactory) {

this.schedulingStrategyFactory = schedulingStrategyFactory;
this.startUpAction = startUpAction;
this.allocatorFactory = allocatorFactory;
}

SchedulingStrategyFactory getSchedulingStrategyFactory() {
return schedulingStrategyFactory;
}

Consumer<ComponentMainThreadExecutor> getStartUpAction() {
return startUpAction;
}

ExecutionSlotAllocatorFactory getAllocatorFactory() {
return allocatorFactory;
}

static DefaultSchedulerComponents createSchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
switch (schedulingStrategy) {
case PIPELINED_REGION_SCHEDULING:
return createPipelinedRegionSchedulerComponents(
scheduleMode,
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
case LEGACY_SCHEDULING:
return createLegacySchedulerComponents(
scheduleMode,
jobMasterConfiguration,
slotPool,
slotRequestTimeout);
default:
throw new IllegalStateException("Unsupported scheduling strategy " + schedulingStrategy);
}
}

private static DefaultSchedulerComponents createLegacySchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
scheduleMode,
scheduler,
slotRequestTimeout);
return new DefaultSchedulerComponents(
createLegacySchedulingStrategyFactory(scheduleMode),
scheduler::start,
new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
}

private static SchedulingStrategyFactory createLegacySchedulingStrategyFactory(final ScheduleMode scheduleMode) {
switch (scheduleMode) {
case EAGER:
return new EagerSchedulingStrategy.Factory();
case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
case LAZY_FROM_SOURCES:
return new LazyFromSourcesSchedulingStrategy.Factory();
default:
throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
}
}

private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
.createFromSlotPool(slotPool, SystemClock.getInstance());
final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
physicalSlotProvider,
scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
bulkChecker,
slotRequestTimeout);
return new DefaultSchedulerComponents(
new PipelinedRegionSchedulingStrategy.Factory(),
bulkChecker::start,
allocatorFactory);
}

private static SlotSelectionStrategy selectSlotSelectionStrategy(final Configuration configuration) {
final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);

final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;

locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
LocationPreferenceSlotSelectionStrategy.createDefault();

return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
locationPreferenceSlotSelectionStrategy;
}
}
Loading

0 comments on commit 4df2295

Please sign in to comment.