Skip to content

Commit

Permalink
[FLINK-26518][table-planner] Port FlinkRelBuilder to Java
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Mar 23, 2022
1 parent 3a2b8da commit f8cb19e
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 268 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.calcite;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.plan.QueryOperationConverter;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;

import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;

/** Flink-specific {@link RelBuilder}. */
@Internal
public final class FlinkRelBuilder extends RelBuilder {

private final QueryOperationConverter toRelNodeConverter;

private final ExpandFactory expandFactory;

private final RankFactory rankFactory;

private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);

this.toRelNodeConverter =
new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode());
this.expandFactory =
Util.first(
context.unwrap(ExpandFactory.class),
FlinkRelFactories.DEFAULT_EXPAND_FACTORY());
this.rankFactory =
Util.first(
context.unwrap(RankFactory.class),
FlinkRelFactories.DEFAULT_RANK_FACTORY());
}

public static FlinkRelBuilder of(
Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
return new FlinkRelBuilder(Preconditions.checkNotNull(context), cluster, relOptSchema);
}

public static FlinkRelBuilder of(RelOptCluster cluster, RelOptSchema relOptSchema) {
return FlinkRelBuilder.of(cluster.getPlanner().getContext(), cluster, relOptSchema);
}

public static RelBuilderFactory proto(Context context) {
return (cluster, schema) -> {
final Context clusterContext = cluster.getPlanner().getContext();
final Context chain = Contexts.chain(context, clusterContext);
return FlinkRelBuilder.of(chain, cluster, schema);
};
}

public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
final RelNode input = build();
final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
return push(expand);
}

public RelBuilder rank(
ImmutableBitSet partitionKey,
RelCollation orderKey,
RankType rankType,
RankRange rankRange,
RelDataTypeField rankNumberType,
boolean outputRankNumber) {
final RelNode input = build();
final RelNode rank =
rankFactory.createRank(
input,
partitionKey,
orderKey,
rankType,
rankRange,
rankNumberType,
outputRankNumber);
return push(rank);
}

/** Build non-window aggregate for either aggregate or table aggregate. */
@Override
public RelBuilder aggregate(
RelBuilder.GroupKey groupKey, Iterable<RelBuilder.AggCall> aggCalls) {
// build a relNode, the build() may also return a project
RelNode relNode = super.aggregate(groupKey, aggCalls).build();

if (relNode instanceof LogicalAggregate) {
final LogicalAggregate logicalAggregate = (LogicalAggregate) relNode;
if (isTableAggregate(logicalAggregate.getAggCallList())) {
relNode = LogicalTableAggregate.create(logicalAggregate);
} else if (isCountStarAgg(logicalAggregate)) {
final RelNode newAggInput =
push(logicalAggregate.getInput(0)).project(literal(0)).build();
relNode =
logicalAggregate.copy(
logicalAggregate.getTraitSet(), ImmutableList.of(newAggInput));
}
}

return push(relNode);
}

/** Build window aggregate for either aggregate or table aggregate. */
public RelBuilder windowAggregate(
LogicalWindow window,
GroupKey groupKey,
List<NamedWindowProperty> namedProperties,
Iterable<AggCall> aggCalls) {
// build logical aggregate

// Because of:
// [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
// if the input is a Project.
//
// the field can not be pruned if it is referenced by other expressions
// of the window aggregation(i.e. the TUMBLE_START/END).
// To solve this, we config the RelBuilder to forbidden this feature.
final LogicalAggregate aggregate =
(LogicalAggregate)
super.transform(t -> t.withPruneInputOfAggregate(false))
.push(build())
.aggregate(groupKey, aggCalls)
.build();

// build logical window aggregate from it
final RelNode windowAggregate;
if (isTableAggregate(aggregate.getAggCallList())) {
windowAggregate =
LogicalWindowTableAggregate.create(window, namedProperties, aggregate);
} else {
windowAggregate = LogicalWindowAggregate.create(window, namedProperties, aggregate);
}
return push(windowAggregate);
}

/** Build watermark assigner relational node. */
public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
final RelNode input = build();
final RelNode relNode =
LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
return push(relNode);
}

public RelBuilder queryOperation(QueryOperation queryOperation) {
final RelNode relNode = queryOperation.accept(toRelNodeConverter);
return push(relNode);
}

public RelBuilder scan(ObjectIdentifier identifier, Map<String, String> dynamicOptions) {
final List<RelHint> hints = new ArrayList<>();
hints.add(
RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build());
final ToRelContext toRelContext = ViewExpanders.simpleContext(cluster, hints);
final RelNode relNode =
relOptSchema.getTableForMember(identifier.toList()).toRel(toRelContext);
return push(relNode);
}

@Override
public FlinkTypeFactory getTypeFactory() {
return (FlinkTypeFactory) super.getTypeFactory();
}

@Override
public RelBuilder transform(UnaryOperator<Config> transform) {
// Override in order to return a FlinkRelBuilder.
final Context mergedContext =
Contexts.of(transform.apply(Config.DEFAULT), cluster.getPlanner().getContext());
return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
}

private static boolean isCountStarAgg(LogicalAggregate agg) {
if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
return false;
}
final AggregateCall call = agg.getAggCallList().get(0);
return call.getAggregation().getKind() == SqlKind.COUNT
&& call.filterArg == -1
&& call.getArgList().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;

import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -70,8 +73,9 @@ public List<String> getQualifiedName() {

@Override
public RelNode convertToRel(RelOptTable.ToRelContext context) {
FlinkRelBuilder relBuilder =
FlinkRelBuilder.of(context, context.getCluster(), this.getRelOptSchema());
final RelOptCluster cluster = context.getCluster();
final Context chain = Contexts.of(context, cluster.getPlanner().getContext());
final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema());

return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public FlinkRelBuilder createRelBuilder(String currentCatalog, String currentDat
context,
// Sets up the ViewExpander explicitly for FlinkRelBuilder.
createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext());
return new FlinkRelBuilder(chain, cluster, relOptSchema);
return FlinkRelBuilder.of(chain, cluster, relOptSchema);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public static Result decorrelateQuery(RelNode rootRel) {
}

RelOptCluster cluster = rootRel.getCluster();
RelBuilder relBuilder =
new FlinkRelBuilder(cluster.getPlanner().getContext(), cluster, null);
RelBuilder relBuilder = FlinkRelBuilder.of(cluster, null);
RexBuilder rexBuilder = cluster.getRexBuilder();

final SubQueryDecorrelator decorrelator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.DataType;

import org.apache.calcite.plan.RelOptRule;
Expand Down Expand Up @@ -160,7 +161,7 @@ public void onMatch(RelOptRuleCall call) {
window,
inputTimeFieldIndex,
inputTimeIsDate,
agg.getNamedProperties());
JavaScalaConversionUtil.toScala(agg.getNamedProperties()));
call.transformTo(windowAgg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public RelNode convert(RelNode rel) {
agg.getGroupSet().toArray(),
JavaScalaConversionUtil.toScala(aggCalls),
agg.getWindow(),
agg.getNamedProperties(),
JavaScalaConversionUtil.toScala(agg.getNamedProperties()),
emitStrategy);
}
}
Loading

0 comments on commit f8cb19e

Please sign in to comment.