Skip to content

Commit

Permalink
[FLINK-21627][table-planner-blink] The digest of TableScan should con…
Browse files Browse the repository at this point in the history
…sider table hints

This closes apache#15559
  • Loading branch information
godfreyhe committed Apr 18, 2021
1 parent d32ca07 commit c6147f2
Show file tree
Hide file tree
Showing 40 changed files with 960 additions and 193 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.calcite.rel.logical;

import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.schema.Table;

import java.util.List;

/**
* A <code>LogicalTableScan</code> reads all the rows from a {@link RelOptTable}.
*
* <p>This class is copied from Calcite because the {@link #explainTerms} should consider hints.
*
* <p>If the table is a <code>net.sf.saffron.ext.JdbcTable</code>, then this is literally possible.
* But for other kinds of tables, there may be many ways to read the data from the table. For some
* kinds of table, it may not even be possible to read all of the rows unless some narrowing
* constraint is applied.
*
* <p>In the example of the <code>net.sf.saffron.ext.ReflectSchema</code> schema,
*
* <blockquote>
*
* <pre>select from fields</pre>
*
* </blockquote>
*
* <p>cannot be implemented, but
*
* <blockquote>
*
* <pre>select from fields as f
* where f.getClass().getName().equals("java.lang.String")</pre>
*
* </blockquote>
*
* <p>can. It is the optimizer's responsibility to find these ways, by applying transformation
* rules.
*/
public final class LogicalTableScan extends TableScan {
// ~ Constructors -----------------------------------------------------------

/**
* Creates a LogicalTableScan.
*
* <p>Use {@link #create} unless you know what you're doing.
*/
public LogicalTableScan(
RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints, RelOptTable table) {
super(cluster, traitSet, hints, table);
}

@Deprecated // to be removed before 2.0
public LogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
this(cluster, traitSet, ImmutableList.of(), table);
}

@Deprecated // to be removed before 2.0
public LogicalTableScan(RelOptCluster cluster, RelOptTable table) {
this(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), table);
}

/** Creates a LogicalTableScan by parsing serialized output. */
public LogicalTableScan(RelInput input) {
super(input);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert traitSet.containsIfApplicable(Convention.NONE);
assert inputs.isEmpty();
return this;
}

// BEGIN FLINK MODIFICATION
// {@link #explainTerms} method should consider hints due to CALCITE-4581.
// This file should be remove once CALCITE-4581 is fixed.
@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw).itemIf("hints", getHints(), !getHints().isEmpty());
}
// END FLINK MODIFICATION

/**
* Creates a LogicalTableScan.
*
* @param cluster Cluster
* @param relOptTable Table
* @param hints The hints
*/
public static LogicalTableScan create(
RelOptCluster cluster, final RelOptTable relOptTable, List<RelHint> hints) {
final Table table = relOptTable.unwrap(Table.class);
final RelTraitSet traitSet =
cluster.traitSetOf(Convention.NONE)
.replaceIfs(
RelCollationTraitDef.INSTANCE,
() -> {
if (table != null) {
return table.getStatistic().getCollations();
}
return ImmutableList.of();
});
return new LogicalTableScan(cluster, traitSet, hints, relOptTable);
}

@Override
public RelNode withHints(List<RelHint> hintList) {
return new LogicalTableScan(getCluster(), traitSet, hintList, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ public static RelNode convertCollectToRel(
return convertSinkToRel(
relBuilder,
input,
Collections.emptyMap(), // dynamicOptions
collectModifyOperation.getTableIdentifier(),
Collections.emptyMap(),
Collections.emptyMap(), // staticPartitions
false,
tableSink,
catalogTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ private void pushFilterIntoScan(
TableSourceTable tableSourceTable = pushdownResultWithScan._2;

FlinkLogicalTableSourceScan newScan =
FlinkLogicalTableSourceScan.create(scan.getCluster(), tableSourceTable);
FlinkLogicalTableSourceScan.create(
scan.getCluster(), scan.getHints(), tableSourceTable);

// build new calc program
RexProgramBuilder programBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void onMatch(RelOptRuleCall call) {

TableSourceTable newTableSourceTable = applyLimit(limit, scan);
FlinkLogicalTableSourceScan newScan =
FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
FlinkLogicalTableSourceScan.create(
scan.getCluster(), scan.getHints(), newTableSourceTable);
Sort newSort = sort.copy(sort.getTraitSet(), Collections.singletonList(newScan));
call.transformTo(newSort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ protected FlinkLogicalTableSourceScan getNewScan(
newType,
new String[] {digest},
new SourceAbilitySpec[] {abilitySpec});
return FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
return FlinkLogicalTableSourceScan.create(
scan.getCluster(), scan.getHints(), newTableSourceTable);
}

protected boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan scan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,12 @@ object FlinkLogicalRelFactories {
case s: LogicalTableScan if FlinkLogicalLegacyTableSourceScan.isTableSourceScan(s) =>
FlinkLogicalLegacyTableSourceScan.create(
cluster,
hints,
s.getTable.asInstanceOf[FlinkPreparingTableBase])
case s: LogicalTableScan if FlinkLogicalDataStreamTableScan.isDataStreamTableScan(s) =>
FlinkLogicalDataStreamTableScan.create(
cluster,
hints,
s.getTable.asInstanceOf[FlinkPreparingTableBase])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.schema.DataStreamTable
import org.apache.flink.table.planner.plan.utils.RelExplainUtil

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
Expand All @@ -28,11 +29,11 @@ import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}

import java.util
import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConverters._

/**
* Sub-class of [[TableScan]] that is a relational operator
Expand All @@ -41,12 +42,13 @@ import java.util.function.Supplier
class FlinkLogicalDataStreamTableScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
hints: util.List[RelHint],
table: RelOptTable)
extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table)
extends TableScan(cluster, traitSet, hints, table)
with FlinkLogicalRel {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalDataStreamTableScan(cluster, traitSet, table)
new FlinkLogicalDataStreamTableScan(cluster, traitSet, getHints, table)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
Expand All @@ -55,6 +57,12 @@ class FlinkLogicalDataStreamTableScan(
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
}

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("fields", getRowType.getFieldNames.asScala.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(getHints), !(getHints.isEmpty));
}

}

class FlinkLogicalDataStreamTableScanConverter
Expand All @@ -72,7 +80,7 @@ class FlinkLogicalDataStreamTableScanConverter

def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getTable)
FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
}
}

Expand All @@ -84,7 +92,10 @@ object FlinkLogicalDataStreamTableScan {
dataStreamTable != null
}

def create(cluster: RelOptCluster, relOptTable: RelOptTable): FlinkLogicalDataStreamTableScan = {
def create(
cluster: RelOptCluster,
hints: util.List[RelHint],
relOptTable: RelOptTable): FlinkLogicalDataStreamTableScan = {
val dataStreamTable = relOptTable.unwrap(classOf[DataStreamTable[_]])
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs(
RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() {
Expand All @@ -96,6 +107,6 @@ object FlinkLogicalDataStreamTableScan {
}
}
}).simplify()
new FlinkLogicalDataStreamTableScan(cluster, traitSet, dataStreamTable)
new FlinkLogicalDataStreamTableScan(cluster, traitSet, hints, dataStreamTable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.logical
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan.isTableSourceScan
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, LegacyTableSourceTable}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.sources._

import com.google.common.collect.ImmutableList
Expand All @@ -34,7 +35,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}

import java.util
import java.util.Collections
import java.util.function.Supplier

/**
Expand All @@ -44,8 +44,9 @@ import java.util.function.Supplier
class FlinkLogicalLegacyTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
hints: util.List[RelHint],
relOptTable: LegacyTableSourceTable[_])
extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable)
extends TableScan(cluster, traitSet, hints, relOptTable)
with FlinkLogicalRel {

lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
Expand All @@ -55,11 +56,11 @@ class FlinkLogicalLegacyTableSourceScan(
def copy(
traitSet: RelTraitSet,
tableSourceTable: LegacyTableSourceTable[_]): FlinkLogicalLegacyTableSourceScan = {
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable)
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, getHints, tableSourceTable)
}

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, relOptTable)
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, getHints, relOptTable)
}

override def deriveRowType(): RelDataType = {
Expand All @@ -77,6 +78,7 @@ class FlinkLogicalLegacyTableSourceScan(
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("fields", tableSource.getTableSchema.getFieldNames.mkString(", "))
.itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty)
}

}
Expand All @@ -96,7 +98,7 @@ class FlinkLogicalLegacyTableSourceScanConverter
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, table)
FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
}
}

Expand All @@ -108,8 +110,10 @@ object FlinkLogicalLegacyTableSourceScan {
tableSourceTable != null
}

def create(cluster: RelOptCluster, relOptTable: FlinkPreparingTableBase)
: FlinkLogicalLegacyTableSourceScan = {
def create(
cluster: RelOptCluster,
hints: util.List[RelHint],
relOptTable: FlinkPreparingTableBase): FlinkLogicalLegacyTableSourceScan = {
val table = relOptTable.unwrap(classOf[LegacyTableSourceTable[_]])
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs(
RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() {
Expand All @@ -121,6 +125,6 @@ object FlinkLogicalLegacyTableSourceScan {
}
}
}).simplify()
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, table)
new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, hints, table)
}
}
Loading

0 comments on commit c6147f2

Please sign in to comment.