Skip to content

Commit

Permalink
Inline TaskScheduler in ScaledWriterScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Sep 17, 2018
1 parent 15acda5 commit 1321d68
Showing 1 changed file with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@
public class ScaledWriterScheduler
implements StageScheduler
{
private interface TaskScheduler
{
RemoteTask scheduleTask(Node node, int partition, OptionalInt totalPartitions);
}

private final TaskScheduler taskScheduler;
private final SqlStageExecution stage;
private final Supplier<Collection<TaskStatus>> sourceTasksProvider;
private final Supplier<Collection<TaskStatus>> writerTasksProvider;
private final NodeSelector nodeSelector;
Expand All @@ -62,8 +57,7 @@ public ScaledWriterScheduler(
ScheduledExecutorService executor,
DataSize writerMinSize)
{
requireNonNull(stage, "stage is null");
this.taskScheduler = stage::scheduleTask;
this.stage = requireNonNull(stage, "stage is null");
this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
this.writerTasksProvider = requireNonNull(writerTasksProvider, "writerTasksProvider is null");
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
Expand Down Expand Up @@ -125,7 +119,7 @@ private List<RemoteTask> scheduleTasks(int count)

ImmutableList.Builder<RemoteTask> tasks = ImmutableList.builder();
for (Node node : nodes) {
tasks.add(taskScheduler.scheduleTask(node, scheduledNodes.size(), OptionalInt.empty()));
tasks.add(stage.scheduleTask(node, scheduledNodes.size(), OptionalInt.empty()));
scheduledNodes.add(node);
}

Expand Down

0 comments on commit 1321d68

Please sign in to comment.