Skip to content

Commit

Permalink
Eager speculative execution for final LIMIT stages
Browse files Browse the repository at this point in the history
It is quite common to use
  SELECT .... LIMIT N
queries for date exploration.
Currently such queries when run in FTE mode require completion of all (or almost) all
tasks which read source data, even though most of the time final answer
could be obtained much sooner.

This commit enables EAGER_SPECULATIVE tasks execution mode for stages
which have FINAL LIMIT operator. This will allow for returning final
results to user much faster (assuming exchange plugin in use supports
concurrent read and write)
  • Loading branch information
losipiuk committed Sep 20, 2023
1 parent 4ff748d commit 67643a8
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public final class SystemSessionProperties
private static final String FAULT_TOLERANT_EXECUTION_SMALL_STAGE_ESTIMATION_THRESHOLD = "fault_tolerant_execution_small_stage_estimation_threshold";
private static final String FAULT_TOLERANT_EXECUTION_SMALL_STAGE_SOURCE_SIZE_MULTIPLIER = "fault_tolerant_execution_small_stage_source_size_multiplier";
private static final String FAULT_TOLERANT_EXECUTION_SMALL_STAGE_REQUIRE_NO_MORE_PARTITIONS = "fault_tolerant_execution_small_stage_require_no_more_partitions";
private static final String FAULT_TOLERANT_EXECUTION_STAGE_ESTIMATION_FOR_EAGER_PARENT_ENABLED = "fault_tolerant_execution_stage_estimation_for_eager_parent_enabled";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled";
public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold";
public static final String REMOTE_TASK_ADAPTIVE_UPDATE_REQUEST_SIZE_ENABLED = "remote_task_adaptive_update_request_size_enabled";
Expand Down Expand Up @@ -999,6 +1000,11 @@ public SystemSessionProperties(
"Is it required for all stage partitions (tasks) to be enumerated for stage to be used in heuristic to determine if parent stage is small",
queryManagerConfig.isFaultTolerantExecutionSmallStageRequireNoMorePartitions(),
true),
booleanProperty(
FAULT_TOLERANT_EXECUTION_STAGE_ESTIMATION_FOR_EAGER_PARENT_ENABLED,
"Enable aggressive stage output size estimation heuristic for children of stages to be executed eagerly",
queryManagerConfig.isFaultTolerantExecutionStageEstimationForEagerParentEnabled(),
true),
booleanProperty(
ADAPTIVE_PARTIAL_AGGREGATION_ENABLED,
"When enabled, partial aggregation might be adaptively turned off when it does not provide any performance gain",
Expand Down Expand Up @@ -1843,6 +1849,11 @@ public static boolean isFaultTolerantExecutionSmallStageRequireNoMorePartitions(
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_SMALL_STAGE_REQUIRE_NO_MORE_PARTITIONS, Boolean.class);
}

public static boolean isFaultTolerantExecutionStageEstimationForEagerParentEnabled(Session session)
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_STAGE_ESTIMATION_FOR_EAGER_PARENT_ENABLED, Boolean.class);
}

public static boolean isAdaptivePartialAggregationEnabled(Session session)
{
return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class QueryManagerConfig
private DataSize faultTolerantExecutionSmallStageEstimationThreshold = DataSize.of(20, GIGABYTE);
private double faultTolerantExecutionSmallStageSourceSizeMultiplier = 1.2;
private boolean faultTolerantExecutionSmallStageRequireNoMorePartitions;
private boolean faultTolerantExecutionStageEstimationForEagerParentEnabled = true;

@Min(1)
public int getScheduleSplitBatchSize()
Expand Down Expand Up @@ -1077,6 +1078,19 @@ public QueryManagerConfig setFaultTolerantExecutionSmallStageRequireNoMorePartit
return this;
}

public boolean isFaultTolerantExecutionStageEstimationForEagerParentEnabled()
{
return faultTolerantExecutionStageEstimationForEagerParentEnabled;
}

@Config("fault-tolerant-execution-stage-estimation-for-eager-parent-enabled")
@ConfigDescription("Enable aggressive stage output size estimation heuristic for children of stages to be executed eagerly")
public QueryManagerConfig setFaultTolerantExecutionStageEstimationForEagerParentEnabled(boolean faultTolerantExecutionStageEstimationForEagerParentEnabled)
{
this.faultTolerantExecutionStageEstimationForEagerParentEnabled = faultTolerantExecutionStageEstimationForEagerParentEnabled;
return this;
}

public void applyFaultTolerantExecutionDefaults()
{
remoteTaskMaxErrorDuration = new Duration(1, MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import io.trino.sql.planner.SubPlan;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.LimitNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -152,6 +153,7 @@
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionRuntimeAdaptivePartitioningEnabled;
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageEstimationEnabled;
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionSmallStageRequireNoMorePartitions;
import static io.trino.SystemSessionProperties.isFaultTolerantExecutionStageEstimationForEagerParentEnabled;
import static io.trino.execution.BasicStageStats.aggregateBasicStageStats;
import static io.trino.execution.StageState.ABORTED;
import static io.trino.execution.StageState.PLANNED;
Expand Down Expand Up @@ -218,6 +220,7 @@ public class EventDrivenFaultTolerantQueryScheduler
private final double smallStageSourceSizeMultiplier;
private final DataSize smallSizePartitionSizeEstimate;
private final boolean smallStageRequireNoMorePartitions;
private final boolean stageEstimationForEagerParentEnabled;

private final StageRegistry stageRegistry;

Expand Down Expand Up @@ -274,6 +277,7 @@ public EventDrivenFaultTolerantQueryScheduler(
this.smallStageSourceSizeMultiplier = getFaultTolerantExecutionSmallStageSourceSizeMultiplier(queryStateMachine.getSession());
this.smallSizePartitionSizeEstimate = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(queryStateMachine.getSession());
this.smallStageRequireNoMorePartitions = isFaultTolerantExecutionSmallStageRequireNoMorePartitions(queryStateMachine.getSession());
this.stageEstimationForEagerParentEnabled = isFaultTolerantExecutionStageEstimationForEagerParentEnabled(queryStateMachine.getSession());

stageRegistry = new StageRegistry(queryStateMachine, originalPlan);
}
Expand Down Expand Up @@ -358,6 +362,7 @@ public synchronized void start()
smallStageEstimationThreshold,
smallStageSourceSizeMultiplier,
smallSizePartitionSizeEstimate,
stageEstimationForEagerParentEnabled,
smallStageRequireNoMorePartitions);
queryExecutor.submit(scheduler::run);
}
Expand Down Expand Up @@ -544,6 +549,7 @@ private static class Scheduler
private final double smallStageSourceSizeMultiplier;
private final DataSize smallSizePartitionSizeEstimate;
private final boolean smallStageRequireNoMorePartitions;
private final boolean stageEstimationForEagerParentEnabled;

private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
private final List<Event> eventBuffer = new ArrayList<>(EVENT_BUFFER_CAPACITY);
Expand Down Expand Up @@ -600,6 +606,7 @@ public Scheduler(
DataSize smallStageEstimationThreshold,
double smallStageSourceSizeMultiplier,
DataSize smallSizePartitionSizeEstimate,
boolean stageEstimationForEagerParentEnabled,
boolean smallStageRequireNoMorePartitions)
{
this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null");
Expand Down Expand Up @@ -636,6 +643,7 @@ public Scheduler(
this.smallStageEstimationThreshold = requireNonNull(smallStageEstimationThreshold, "smallStageEstimationThreshold is null");
this.smallStageSourceSizeMultiplier = smallStageSourceSizeMultiplier;
this.smallSizePartitionSizeEstimate = requireNonNull(smallSizePartitionSizeEstimate, "smallSizePartitionSizeEstimate is null");
this.stageEstimationForEagerParentEnabled = stageEstimationForEagerParentEnabled;
this.smallStageRequireNoMorePartitions = smallStageRequireNoMorePartitions;

planInTopologicalOrder = sortPlanInTopologicalOrder(plan);
Expand Down Expand Up @@ -971,16 +979,12 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan)
boolean standardTasksWaitingForNode = preSchedulingTaskContexts.values().stream()
.anyMatch(task -> task.getExecutionClass() == STANDARD && !task.getNodeLease().getNode().isDone());

// Do not start a speculative stage if there is non-speculative work still to be done.
// Do not start a speculative stage after partition count has been changed at runtime, as when we estimate
// by progress, repartition tasks will produce very uneven output for different output partitions, which
// will result in very bad task bin-packing results; also the fact that runtime adaptive partitioning
// happened already suggests that there is plenty work ahead.
boolean canScheduleSpeculative = !standardTasksInQueue && !standardTasksWaitingForNode && !runtimeAdaptivePartitioningApplied;
boolean eager = stageEstimationForEagerParentEnabled && shouldScheduleEagerly(subPlan);
boolean speculative = false;
int finishedSourcesCount = 0;
int estimatedByProgressSourcesCount = 0;
int estimatedBySmallInputSourcesCount = 0;
int estimatedForEagerParent = 0;

ImmutableMap.Builder<StageId, OutputDataSizeEstimate> sourceOutputSizeEstimates = ImmutableMap.builder();

Expand All @@ -998,14 +1002,24 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan)
// speculative execution not supported by Exchange implementation
return IsReadyForExecutionResult.notReady();
}
if (!canScheduleSpeculative) {
if (runtimeAdaptivePartitioningApplied) {
// Do not start a speculative stage after partition count has been changed at runtime, as when we estimate
// by progress, repartition tasks will produce very uneven output for different output partitions, which
// will result in very bad task bin-packing results; also the fact that runtime adaptive partitioning
// happened already suggests that there is plenty work ahead.
return IsReadyForExecutionResult.notReady();
}

if ((standardTasksInQueue || standardTasksWaitingForNode) && !eager) {
// Do not start a non-eager speculative stage if there is non-speculative work still to be done.
return IsReadyForExecutionResult.notReady();
}

speculative = true;
}
else {
// source stage finished; no more checks needed
OutputDataSizeEstimateResult result = sourceStageExecution.getOutputDataSize(stageExecutions::get).orElseThrow();
OutputDataSizeEstimateResult result = sourceStageExecution.getOutputDataSize(stageExecutions::get, eager).orElseThrow();
verify(result.getStatus() == OutputDataSizeEstimateStatus.FINISHED, "expected FINISHED status but got %s", result.getStatus());
finishedSourcesCount++;
sourceOutputSizeEstimates.put(sourceStageExecution.getStageId(), result.getOutputDataSizeEstimate());
Expand All @@ -1023,34 +1037,52 @@ private IsReadyForExecutionResult isReadyForExecution(SubPlan subPlan)
return IsReadyForExecutionResult.notReady();
}

Optional<OutputDataSizeEstimateResult> result = sourceStageExecution.getOutputDataSize(stageExecutions::get);
Optional<OutputDataSizeEstimateResult> result = sourceStageExecution.getOutputDataSize(stageExecutions::get, eager);
if (result.isEmpty()) {
return IsReadyForExecutionResult.notReady();
}

switch (result.orElseThrow().getStatus()) {
case ESTIMATED_BY_PROGRESS -> estimatedByProgressSourcesCount++;
case ESTIMATED_BY_SMALL_INPUT -> estimatedBySmallInputSourcesCount++;
case ESTIMATED_FOR_EAGER_PARENT -> estimatedForEagerParent++;
default -> throw new IllegalStateException(format("unexpected status %s", result.orElseThrow().getStatus())); // FINISHED handled above
}

sourceOutputSizeEstimates.put(sourceStageExecution.getStageId(), result.orElseThrow().getOutputDataSizeEstimate());
someSourcesMadeProgress = someSourcesMadeProgress || sourceStageExecution.isSomeProgressMade();
}

if (!subPlan.getChildren().isEmpty() && !someSourcesMadeProgress) {
if (!subPlan.getChildren().isEmpty() && !someSourcesMadeProgress && !eager) {
return IsReadyForExecutionResult.notReady();
}

if (speculative) {
log.debug("scheduling speculative %s/%s; sources: finished=%s; estimatedByProgress=%s; estimatedSmall=%s",
log.debug("scheduling speculative %s/%s; sources: finished=%s; estimatedByProgress=%s; estimatedSmall=%s; estimatedForEagerParent=%s",
queryStateMachine.getQueryId(),
subPlan.getFragment().getId(),
finishedSourcesCount,
estimatedByProgressSourcesCount,
estimatedBySmallInputSourcesCount);
estimatedBySmallInputSourcesCount,
estimatedForEagerParent);
}
return IsReadyForExecutionResult.ready(sourceOutputSizeEstimates.buildOrThrow(), eager);
}

private boolean shouldScheduleEagerly(SubPlan subPlan)
{
return hasSmallFinalLimitNode(subPlan);
}

private static boolean hasSmallFinalLimitNode(SubPlan subPlan)
{
if (!subPlan.getFragment().getPartitioning().isSingleNode()) {
// Final LIMIT should always have SINGLE distribution
return false;
}
return IsReadyForExecutionResult.ready(sourceOutputSizeEstimates.buildOrThrow(), false);
return PlanNodeSearcher.searchFrom(subPlan.getFragment().getRoot())
.where(node -> node instanceof LimitNode limitNode && !limitNode.isPartial() && limitNode.getCount() < 1_000_000)
.matches();
}

/**
Expand Down Expand Up @@ -2087,13 +2119,17 @@ public Optional<NodeRequirements> getNodeRequirements(int partitionId)
return getStagePartition(partitionId).getNodeRequirements();
}

public Optional<OutputDataSizeEstimateResult> getOutputDataSize(Function<StageId, StageExecution> stageExecutionLookup)
public Optional<OutputDataSizeEstimateResult> getOutputDataSize(Function<StageId, StageExecution> stageExecutionLookup, boolean parentEager)
{
if (stage.getState() == StageState.FINISHED) {
return Optional.of(new OutputDataSizeEstimateResult(
new OutputDataSizeEstimate(ImmutableLongArray.copyOf(outputDataSize)), OutputDataSizeEstimateStatus.FINISHED));
}
return getEstimatedOutputDataSize().or(() -> getEstimatedSmallStageOutputDataSize(stageExecutionLookup));
Optional<OutputDataSizeEstimateResult> result = getEstimatedOutputDataSize().or(() -> getEstimatedSmallStageOutputDataSize(stageExecutionLookup));
if (result.isEmpty() && parentEager) {
result = getEstimatedStageOutputSizeForEagerParent();
}
return result;
}

public boolean isSomeProgressMade()
Expand Down Expand Up @@ -2169,7 +2205,7 @@ private Optional<OutputDataSizeEstimateResult> getEstimatedSmallStageOutputDataS

StageExecution sourceStage = stageExecutionLookup.apply(sourceStageId);
requireNonNull(sourceStage, "sourceStage is null");
Optional<OutputDataSizeEstimateResult> sourceStageOutputDataSize = sourceStage.getOutputDataSize(stageExecutionLookup);
Optional<OutputDataSizeEstimateResult> sourceStageOutputDataSize = sourceStage.getOutputDataSize(stageExecutionLookup, false);

if (sourceStageOutputDataSize.isEmpty()) {
// cant estimate size of one of sources; should not happen in practice
Expand All @@ -2195,6 +2231,19 @@ private Optional<OutputDataSizeEstimateResult> getEstimatedSmallStageOutputDataS
return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_BY_SMALL_INPUT));
}

private Optional<OutputDataSizeEstimateResult> getEstimatedStageOutputSizeForEagerParent()
{
// use empty estimate as fallback for eager parents. It matches current logic of assessing if node should be processed eagerly or not.
// Currently, we use eager task exectuion only for stages with small FINAL LIMIT which implies small input from child stages (child stages will
// enforce small input via PARTIAL LIMIT)
int outputPartitionsCount = sinkPartitioningScheme.getPartitionCount();
ImmutableLongArray.Builder estimateBuilder = ImmutableLongArray.builder(outputPartitionsCount);
for (int i = 0; i < outputPartitionsCount; ++i) {
estimateBuilder.add(0);
}
return Optional.of(new OutputDataSizeEstimateResult(estimateBuilder.build(), OutputDataSizeEstimateStatus.ESTIMATED_FOR_EAGER_PARENT));
}

public ExchangeSourceOutputSelector getSinkOutputSelector()
{
if (finalSinkOutputSelector != null) {
Expand Down Expand Up @@ -2270,7 +2319,8 @@ private StagePartition getStagePartition(int partitionId)
private enum OutputDataSizeEstimateStatus {
FINISHED,
ESTIMATED_BY_PROGRESS,
ESTIMATED_BY_SMALL_INPUT
ESTIMATED_BY_SMALL_INPUT,
ESTIMATED_FOR_EAGER_PARENT
}

private static class OutputDataSizeEstimateResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testDefaults()
.setFaultTolerantExecutionSmallStageEstimationThreshold(DataSize.of(20, GIGABYTE))
.setFaultTolerantExecutionSmallStageSourceSizeMultiplier(1.2)
.setFaultTolerantExecutionSmallStageRequireNoMorePartitions(false)
.setFaultTolerantExecutionStageEstimationForEagerParentEnabled(true)
.setMaxWriterTasksCount(100));
}

Expand Down Expand Up @@ -182,6 +183,7 @@ public void testExplicitPropertyMappings()
.put("fault-tolerant-execution-small-stage-estimation-threshold", "6GB")
.put("fault-tolerant-execution-small-stage-source-size-multiplier", "1.6")
.put("fault-tolerant-execution-small-stage-require-no-more-partitions", "true")
.put("fault-tolerant-execution-stage-estimation-for-eager-parent-enabled", "false")
.buildOrThrow();

QueryManagerConfig expected = new QueryManagerConfig()
Expand Down Expand Up @@ -252,6 +254,7 @@ public void testExplicitPropertyMappings()
.setFaultTolerantExecutionSmallStageEstimationThreshold(DataSize.of(6, GIGABYTE))
.setFaultTolerantExecutionSmallStageSourceSizeMultiplier(1.6)
.setFaultTolerantExecutionSmallStageRequireNoMorePartitions(true)
.setFaultTolerantExecutionStageEstimationForEagerParentEnabled(false)
.setMaxWriterTasksCount(101);

assertFullMapping(properties, expected);
Expand Down

0 comments on commit 67643a8

Please sign in to comment.