Skip to content

Commit

Permalink
Move LimitNode and TopNNode to SPI
Browse files Browse the repository at this point in the history
Move LimitNode, TopNNode, and OrderingScheme to presto-spi module. This
will allow visiting and thus pushdown of limits at the connector level.
  • Loading branch information
AkshayPall authored and highker committed Aug 15, 2019
1 parent ac1953b commit 87d1783
Show file tree
Hide file tree
Showing 76 changed files with 654 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand All @@ -28,7 +29,6 @@
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.LimitNode;
import com.facebook.presto.sql.planner.plan.OutputNode;
import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import com.facebook.presto.Session;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.facebook.presto.sql.planner.plan.LimitNode;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.optimizations.JoinNodeUtils;
import com.facebook.presto.sql.planner.plan.AggregationNode;
Expand All @@ -25,12 +27,10 @@
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.LimitNode;
import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
import com.facebook.presto.sql.planner.plan.TopNNode;
import com.facebook.presto.sql.planner.plan.UnionNode;
import com.facebook.presto.sql.planner.plan.WindowNode;
import com.facebook.presto.sql.relational.OriginalExpressionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.planner;

import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.relation.RowExpression;
Expand Down Expand Up @@ -94,7 +95,7 @@ public Void visitAggregation(AggregationNode node, ImmutableList.Builder<RowExpr
aggregation.getArguments().forEach(context::add);
aggregation.getFilter().ifPresent(context::add);
aggregation.getOrderBy()
.map(OrderingScheme::getOrderBy)
.map(OrderingScheme::getOrderByVariables)
.orElse(ImmutableList.of())
.forEach(context::add);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@
import com.facebook.presto.spi.function.FunctionMetadata;
import com.facebook.presto.spi.function.OperatorType;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.ConstantExpression;
Expand Down Expand Up @@ -151,7 +154,6 @@
import com.facebook.presto.sql.planner.plan.IndexSourceNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.LimitNode;
import com.facebook.presto.sql.planner.plan.MarkDistinctNode;
import com.facebook.presto.sql.planner.plan.MetadataDeleteNode;
import com.facebook.presto.sql.planner.plan.OutputNode;
Expand All @@ -167,7 +169,6 @@
import com.facebook.presto.sql.planner.plan.TableFinishNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode.DeleteHandle;
import com.facebook.presto.sql.planner.plan.TopNNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UnionNode;
import com.facebook.presto.sql.planner.plan.UnnestNode;
Expand Down Expand Up @@ -722,8 +723,8 @@ private PhysicalOperation createMergeSource(RemoteSourceNode node, LocalExecutio

OrderingScheme orderingScheme = node.getOrderingScheme().get();
ImmutableMap<VariableReferenceExpression, Integer> layout = makeLayout(node);
List<Integer> sortChannels = getChannelsForVariables(orderingScheme.getOrderBy(), layout);
List<SortOrder> sortOrder = orderingScheme.getOrderingList();
List<Integer> sortChannels = getChannelsForVariables(orderingScheme.getOrderByVariables(), layout);
List<SortOrder> sortOrder = getOrderingList(orderingScheme);

List<Type> types = getSourceOperatorTypes(node, context.getTypes());
ImmutableList<Integer> outputChannels = IntStream.range(0, types.size())
Expand Down Expand Up @@ -829,7 +830,7 @@ public PhysicalOperation visitTopNRowNumber(TopNRowNumberNode node, LocalExecuti
.map(channel -> source.getTypes().get(channel))
.collect(toImmutableList());

List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderBy();
List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderByVariables();
List<Integer> sortChannels = getChannelsForVariables(orderByVariables, source.getLayout());
List<SortOrder> sortOrder = orderByVariables.stream()
.map(variable -> node.getOrderingScheme().getOrdering(variable))
Expand Down Expand Up @@ -882,8 +883,8 @@ public PhysicalOperation visitWindow(WindowNode node, LocalExecutionPlanContext

if (node.getOrderingScheme().isPresent()) {
OrderingScheme orderingScheme = node.getOrderingScheme().get();
sortChannels = getChannelsForVariables(orderingScheme.getOrderBy(), source.getLayout());
sortOrder = orderingScheme.getOrderingList();
sortChannels = getChannelsForVariables(orderingScheme.getOrderByVariables(), source.getLayout());
sortOrder = getOrderingList(orderingScheme);
}

ImmutableList.Builder<Integer> outputChannels = ImmutableList.builder();
Expand Down Expand Up @@ -959,7 +960,7 @@ public PhysicalOperation visitTopN(TopNNode node, LocalExecutionPlanContext cont
{
PhysicalOperation source = node.getSource().accept(this, context);

List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderBy();
List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderByVariables();

List<Integer> sortChannels = new ArrayList<>();
List<SortOrder> sortOrders = new ArrayList<>();
Expand All @@ -984,7 +985,7 @@ public PhysicalOperation visitSort(SortNode node, LocalExecutionPlanContext cont
{
PhysicalOperation source = node.getSource().accept(this, context);

List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderBy();
List<VariableReferenceExpression> orderByVariables = node.getOrderingScheme().getOrderByVariables();

List<Integer> orderByChannels = getChannelsForVariables(orderByVariables, source.getLayout());

Expand Down Expand Up @@ -2381,8 +2382,8 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan

OrderingScheme orderingScheme = node.getOrderingScheme().get();
ImmutableMap<VariableReferenceExpression, Integer> layout = makeLayout(node);
List<Integer> sortChannels = getChannelsForVariables(orderingScheme.getOrderBy(), layout);
List<SortOrder> orderings = orderingScheme.getOrderingList();
List<Integer> sortChannels = getChannelsForVariables(orderingScheme.getOrderByVariables(), layout);
List<SortOrder> orderings = getOrderingList(orderingScheme);
OperatorFactory operatorFactory = new LocalMergeSourceOperatorFactory(
context.getNextOperatorId(),
node.getId(),
Expand Down Expand Up @@ -2526,8 +2527,8 @@ private AccumulatorFactory buildAccumulatorFactory(
List<VariableReferenceExpression> sortKeys = ImmutableList.of();
if (aggregation.getOrderBy().isPresent()) {
OrderingScheme orderBy = aggregation.getOrderBy().get();
sortKeys = orderBy.getOrderBy();
sortOrders = orderBy.getOrderingList();
sortKeys = orderBy.getOrderByVariables();
sortOrders = getOrderingList(orderBy);
}

return internalAggregationFunction.bind(
Expand Down Expand Up @@ -2771,6 +2772,15 @@ private static Function<VariableReferenceExpression, Integer> variableChannelGet
};
}

/**
* List of sort orders in the same order as the list of variables returned from `getOrderByVariables()`. This means for
* index i, variable `getOrderByVariables().get(i)` has order `getOrderingList().get(i)`.
*/
private static List<SortOrder> getOrderingList(OrderingScheme orderingScheme)
{
return orderingScheme.getOrderByVariables().stream().map(orderingScheme.getOrderingsMap()::get).collect(toImmutableList());
}

/**
* Encapsulates an physical operator plus the mapping of logical variables to channel/field
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.TableScanNode;
Expand All @@ -54,7 +55,6 @@
import com.facebook.presto.sql.planner.plan.DeleteNode;
import com.facebook.presto.sql.planner.plan.ExplainAnalyzeNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.LimitNode;
import com.facebook.presto.sql.planner.plan.OutputNode;
import com.facebook.presto.sql.planner.plan.ProjectNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
Expand Down Expand Up @@ -92,6 +92,7 @@
import static com.facebook.presto.SystemSessionProperties.isPrintStatsForNonJoinQuery;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.plan.LimitNode.Step.FINAL;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -409,7 +410,7 @@ private RelationPlan createTableWriterPlan(
PlanNode source = plan.getRoot();

if (!analysis.isCreateTableAsSelectWithData()) {
source = new LimitNode(idAllocator.getNextId(), source, 0L, false);
source = new LimitNode(idAllocator.getNextId(), source, 0L, FINAL);
}

// todo this should be checked in analysis
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
package com.facebook.presto.sql.planner;

import com.facebook.presto.spi.block.SortOrder;
import com.facebook.presto.spi.plan.Ordering;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.OrderBy;
import com.facebook.presto.sql.tree.SortItem;
import com.facebook.presto.sql.tree.SymbolReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;

import java.util.LinkedHashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.facebook.presto.sql.relational.Expressions.variable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Streams.forEachPair;

public class PlannerUtils
{
Expand Down Expand Up @@ -64,10 +66,19 @@ public static OrderingScheme toOrderingScheme(OrderBy orderBy, TypeProvider type

public static OrderingScheme toOrderingScheme(List<VariableReferenceExpression> orderingSymbols, List<SortOrder> sortOrders)
{
Map<VariableReferenceExpression, SortOrder> orderings = new LinkedHashMap<>();
ImmutableList.Builder<Ordering> builder = ImmutableList.builder();

// don't override existing keys, i.e. when "ORDER BY a ASC, a DESC" is specified
Streams.forEachPair(orderingSymbols.stream(), sortOrders.stream(), orderings::putIfAbsent);
return new OrderingScheme(ImmutableList.copyOf(orderings.keySet()), orderings);
Set<VariableReferenceExpression> keysSeen = new HashSet<>();

forEachPair(orderingSymbols.stream(), sortOrders.stream(), (variable, sortOrder) -> {
if (!keysSeen.contains(variable)) {
keysSeen.add(variable);
builder.add(new Ordering(variable, sortOrder));
}
});

return new OrderingScheme(builder.build());
}

public static VariableReferenceExpression toVariableReference(Expression expression, TypeProvider types)
Expand Down
Loading

0 comments on commit 87d1783

Please sign in to comment.