Skip to content

Commit

Permalink
[FLINK-20593] Remove EagerSchedulingStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Dec 15, 2020
1 parent 9ba5ab4 commit 774e6a3
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,15 @@
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.util.clock.SystemClock;
Expand Down Expand Up @@ -93,33 +89,6 @@ static DefaultSchedulerComponents createSchedulerComponents(
slotRequestTimeout);
}

private static DefaultSchedulerComponents createLegacySchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
final SlotPool slotPool,
final Time slotRequestTimeout) {

final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
scheduleMode,
scheduler,
slotRequestTimeout);
return new DefaultSchedulerComponents(
createLegacySchedulingStrategyFactory(scheduleMode),
scheduler::start,
new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
}

private static SchedulingStrategyFactory createLegacySchedulingStrategyFactory(final ScheduleMode scheduleMode) {
switch (scheduleMode) {
case EAGER:
return new EagerSchedulingStrategy.Factory();
default:
throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
}
}

private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents(
final ScheduleMode scheduleMode,
final Configuration jobMasterConfiguration,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
Expand Down Expand Up @@ -754,7 +754,7 @@ private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {

private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) {
final SchedulingStrategyFactory schedulingStrategyFactory =
new EagerSchedulingStrategy.Factory();
new PipelinedRegionSchedulingStrategy.Factory();

try {
final DefaultScheduler scheduler = createScheduler(jobGraph, schedulingStrategyFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
Expand Down Expand Up @@ -172,7 +171,7 @@ public static DefaultSchedulerBuilder createScheduler(
return newSchedulerBuilder(jobGraph)
.setFutureExecutor(asyncExecutor)
.setDelayExecutor(asyncExecutor)
.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
.setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
}
Expand Down

This file was deleted.

0 comments on commit 774e6a3

Please sign in to comment.