Skip to content

Commit

Permalink
[multistage] Initial (phase 1) Query runtime for window functions - e…
Browse files Browse the repository at this point in the history
…mpty OVER() and OVER(PARTITION BY) (#10286)

* Add query execution support for empty OVER() and OVER(PARTITION BY) window functions
* Fixes for supporting OVER(PARTITION BY k1 ORDER BY k1) and add some tests for this scenario
* Address review comments, add some WITH statement + window function tests
* Address review comments, move Accumultor to separate AggregationUtils class
* Add support for BOOL_OR and BOOL_AND window functions (also supported by PostgreSQL)
  • Loading branch information
somandal authored Mar 6, 2023
1 parent f28525b commit e342b1f
Show file tree
Hide file tree
Showing 15 changed files with 3,528 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@
* Special rule for Pinot, this rule is fixed to always insert an exchange or sort exchange below the WINDOW node.
* TODO:
* 1. Add support for more than one window group
* 2. Add support for functions other than aggregation functions (AVG, COUNT, MAX, MIN, SUM)
* 2. Add support for functions other than aggregation functions (AVG, COUNT, MAX, MIN, SUM, BOOL_AND, BOOL_OR)
* 3. Add support for custom frames
*/
public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
public static final PinotWindowExchangeNodeInsertRule INSTANCE =
new PinotWindowExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);

// Supported window functions
// OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR
private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);

public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) {
super(operand(LogicalWindow.class, any()), factory, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.WindowNode;
import org.apache.pinot.query.routing.VirtualServer;


Expand Down Expand Up @@ -81,6 +82,14 @@ public void attach(StageNode stageNode) {
_requiresSingletonInstance = _requiresSingletonInstance || (sortNode.getCollationKeys().size() > 0
&& sortNode.getOffset() != -1);
}
if (stageNode instanceof WindowNode) {
WindowNode windowNode = (WindowNode) stageNode;
// TODO: Figure out a way to parallelize Empty OVER() and OVER(ORDER BY) so the computation can be done across
// multiple nodes.
// Empty OVER() and OVER(ORDER BY) need to be processed on a singleton node. OVER() with PARTITION BY can be
// distributed as no global ordering is required across partitions.
_requiresSingletonInstance = _requiresSingletonInstance || (windowNode.getGroupSet().size() == 0);
}
}

public List<String> getScannedTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,46 @@

import com.clearspring.analytics.util.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;


public class WindowNode extends AbstractStageNode {
@ProtoProperties
public List<RexExpression> _groupSet;
private List<RexExpression> _groupSet;
@ProtoProperties
public List<RexExpression> _orderSet;
private List<RexExpression> _orderSet;
@ProtoProperties
public List<RelFieldCollation.Direction> _orderSetDirection;
private List<RelFieldCollation.Direction> _orderSetDirection;
@ProtoProperties
public List<RelFieldCollation.NullDirection> _orderSetNullDirection;
private List<RelFieldCollation.NullDirection> _orderSetNullDirection;
@ProtoProperties
public List<RexExpression> _aggCalls;
private List<RexExpression> _aggCalls;
@ProtoProperties
public int _lowerBound;
private int _lowerBound;
@ProtoProperties
public int _upperBound;
@ProtoProperties
public boolean _isRows;
private int _upperBound;
@ProtoProperties
private List<RexExpression> _constants;
@ProtoProperties
private WindowFrameType _windowFrameType;

/**
* Enum to denote the type of window frame
* ROW - ROW type window frame
* RANGE - RANGE type window frame
*/
public enum WindowFrameType {
ROW,
RANGE
}

public WindowNode(int stageId) {
super(stageId);
Expand All @@ -62,7 +72,7 @@ public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral>
String.format("Only a single window group is allowed! Number of window groups: %d", windowGroups.size()));
Window.Group windowGroup = windowGroups.get(0);

_groupSet = windowGroup.keys == null ? new ArrayList<>() : RexExpression.toRexInputRefs(windowGroup.keys);
_groupSet = windowGroup.keys == null ? Collections.emptyList() : RexExpression.toRexInputRefs(windowGroup.keys);
List<RelFieldCollation> relFieldCollations = windowGroup.orderKeys == null ? new ArrayList<>()
: windowGroup.orderKeys.getFieldCollations();
_orderSet = new ArrayList<>(relFieldCollations.size());
Expand All @@ -79,12 +89,12 @@ public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral>
// Frame literals come in the constants from the LogicalWindow and the bound.getOffset() stores the
// InputRef to the constants array offset by the input array length. These need to be extracted here and
// set to the bounds.
validateFrameBounds(windowGroup.lowerBound, windowGroup.upperBound, windowGroup.isRows);

// Lower bound can only be unbounded preceding for now, set to Integer.MIN_VALUE
_lowerBound = Integer.MIN_VALUE;
// Upper bound can only be unbounded following or current row for now
_upperBound = windowGroup.upperBound.isUnbounded() ? Integer.MAX_VALUE : 0;
_isRows = windowGroup.isRows;
_windowFrameType = windowGroup.isRows ? WindowFrameType.ROW : WindowFrameType.RANGE;

// TODO: Constants are used to store constants needed such as the frame literals. For now just save this, need to
// extract the constant values into bounds as a part of frame support.
Expand Down Expand Up @@ -132,26 +142,11 @@ public int getUpperBound() {
return _upperBound;
}

public boolean isRows() {
return _isRows;
public WindowFrameType getWindowFrameType() {
return _windowFrameType;
}

public List<RexExpression> getConstants() {
return _constants;
}

private void validateFrameBounds(RexWindowBound lowerBound, RexWindowBound upperBound, boolean isRows) {
Preconditions.checkState(!isRows, "Only default frame is supported which must be RANGE and not ROWS");
Preconditions.checkState(lowerBound.isPreceding() && lowerBound.isUnbounded()
&& lowerBound.getOffset() == null,
String.format("Only default frame is supported, actual lower bound frame provided: %s", lowerBound));
if (_orderSet.isEmpty()) {
Preconditions.checkState(upperBound.isFollowing() && upperBound.isUnbounded()
&& upperBound.getOffset() == null,
String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
} else {
Preconditions.checkState(upperBound.isCurrentRow() && upperBound.getOffset() == null,
String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
}
}
}
Loading

0 comments on commit e342b1f

Please sign in to comment.