diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java new file mode 100644 index 0000000000000..4de2003afeda7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.rel.logical; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.schema.Table; + +import java.util.List; + +/** + * A LogicalTableScan reads all the rows from a {@link RelOptTable}. + * + *

This class is copied from Calcite because the {@link #explainTerms} should consider hints. + * + *

If the table is a net.sf.saffron.ext.JdbcTable, then this is literally possible. + * But for other kinds of tables, there may be many ways to read the data from the table. For some + * kinds of table, it may not even be possible to read all of the rows unless some narrowing + * constraint is applied. + * + *

In the example of the net.sf.saffron.ext.ReflectSchema schema, + * + *

+ * + *
select from fields
+ * + *
+ * + *

cannot be implemented, but + * + *

+ * + *
select from fields as f
+ * where f.getClass().getName().equals("java.lang.String")
+ * + *
+ * + *

can. It is the optimizer's responsibility to find these ways, by applying transformation + * rules. + */ +public final class LogicalTableScan extends TableScan { + // ~ Constructors ----------------------------------------------------------- + + /** + * Creates a LogicalTableScan. + * + *

Use {@link #create} unless you know what you're doing. + */ + public LogicalTableScan( + RelOptCluster cluster, RelTraitSet traitSet, List hints, RelOptTable table) { + super(cluster, traitSet, hints, table); + } + + @Deprecated // to be removed before 2.0 + public LogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + this(cluster, traitSet, ImmutableList.of(), table); + } + + @Deprecated // to be removed before 2.0 + public LogicalTableScan(RelOptCluster cluster, RelOptTable table) { + this(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), table); + } + + /** Creates a LogicalTableScan by parsing serialized output. */ + public LogicalTableScan(RelInput input) { + super(input); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + assert traitSet.containsIfApplicable(Convention.NONE); + assert inputs.isEmpty(); + return this; + } + + // BEGIN FLINK MODIFICATION + // {@link #explainTerms} method should consider hints due to CALCITE-4581. + // This file should be remove once CALCITE-4581 is fixed. + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).itemIf("hints", getHints(), !getHints().isEmpty()); + } + // END FLINK MODIFICATION + + /** + * Creates a LogicalTableScan. + * + * @param cluster Cluster + * @param relOptTable Table + * @param hints The hints + */ + public static LogicalTableScan create( + RelOptCluster cluster, final RelOptTable relOptTable, List hints) { + final Table table = relOptTable.unwrap(Table.class); + final RelTraitSet traitSet = + cluster.traitSetOf(Convention.NONE) + .replaceIfs( + RelCollationTraitDef.INSTANCE, + () -> { + if (table != null) { + return table.getStatistic().getCollations(); + } + return ImmutableList.of(); + }); + return new LogicalTableScan(cluster, traitSet, hints, relOptTable); + } + + @Override + public RelNode withHints(List hintList) { + return new LogicalTableScan(getCluster(), traitSet, hintList, table); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index acd398f08260b..59703498cbd8a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -103,8 +103,9 @@ public static RelNode convertCollectToRel( return convertSinkToRel( relBuilder, input, + Collections.emptyMap(), // dynamicOptions collectModifyOperation.getTableIdentifier(), - Collections.emptyMap(), + Collections.emptyMap(), // staticPartitions false, tableSink, catalogTable); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java index 04fef0671f5ff..8bd61ba26cfc0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java @@ -109,7 +109,8 @@ private void pushFilterIntoScan( TableSourceTable tableSourceTable = pushdownResultWithScan._2; FlinkLogicalTableSourceScan newScan = - FlinkLogicalTableSourceScan.create(scan.getCluster(), tableSourceTable); + FlinkLogicalTableSourceScan.create( + scan.getCluster(), scan.getHints(), tableSourceTable); // build new calc program RexProgramBuilder programBuilder = diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java index 1aa01ccb868ea..f277ecc59eb12 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.java @@ -83,7 +83,8 @@ public void onMatch(RelOptRuleCall call) { TableSourceTable newTableSourceTable = applyLimit(limit, scan); FlinkLogicalTableSourceScan newScan = - FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable); + FlinkLogicalTableSourceScan.create( + scan.getCluster(), scan.getHints(), newTableSourceTable); Sort newSort = sort.copy(sort.getTraitSet(), Collections.singletonList(newScan)); call.transformTo(newSort); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java index 1c4382ec09e84..ec2c44989c9e2 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java @@ -134,7 +134,8 @@ protected FlinkLogicalTableSourceScan getNewScan( newType, new String[] {digest}, new SourceAbilitySpec[] {abilitySpec}); - return FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable); + return FlinkLogicalTableSourceScan.create( + scan.getCluster(), scan.getHints(), newTableSourceTable); } protected boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan scan) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala index e750c6a4dcbfb..236120216eb01 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala @@ -218,10 +218,12 @@ object FlinkLogicalRelFactories { case s: LogicalTableScan if FlinkLogicalLegacyTableSourceScan.isTableSourceScan(s) => FlinkLogicalLegacyTableSourceScan.create( cluster, + hints, s.getTable.asInstanceOf[FlinkPreparingTableBase]) case s: LogicalTableScan if FlinkLogicalDataStreamTableScan.isDataStreamTableScan(s) => FlinkLogicalDataStreamTableScan.create( cluster, + hints, s.getTable.asInstanceOf[FlinkPreparingTableBase]) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala index d630e8569f30c..f60b4337231e1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.logical import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.schema.DataStreamTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ @@ -28,11 +29,11 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode} +import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter} import java.util -import java.util.Collections import java.util.function.Supplier +import scala.collection.JavaConverters._ /** * Sub-class of [[TableScan]] that is a relational operator @@ -41,12 +42,13 @@ import java.util.function.Supplier class FlinkLogicalDataStreamTableScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], table: RelOptTable) - extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table) + extends TableScan(cluster, traitSet, hints, table) with FlinkLogicalRel { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new FlinkLogicalDataStreamTableScan(cluster, traitSet, table) + new FlinkLogicalDataStreamTableScan(cluster, traitSet, getHints, table) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { @@ -55,6 +57,12 @@ class FlinkLogicalDataStreamTableScan( planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) } + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !(getHints.isEmpty)); + } + } class FlinkLogicalDataStreamTableScanConverter @@ -72,7 +80,7 @@ class FlinkLogicalDataStreamTableScanConverter def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[TableScan] - FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getTable) + FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable) } } @@ -84,7 +92,10 @@ object FlinkLogicalDataStreamTableScan { dataStreamTable != null } - def create(cluster: RelOptCluster, relOptTable: RelOptTable): FlinkLogicalDataStreamTableScan = { + def create( + cluster: RelOptCluster, + hints: util.List[RelHint], + relOptTable: RelOptTable): FlinkLogicalDataStreamTableScan = { val dataStreamTable = relOptTable.unwrap(classOf[DataStreamTable[_]]) val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs( RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() { @@ -96,6 +107,6 @@ object FlinkLogicalDataStreamTableScan { } } }).simplify() - new FlinkLogicalDataStreamTableScan(cluster, traitSet, dataStreamTable) + new FlinkLogicalDataStreamTableScan(cluster, traitSet, hints, dataStreamTable) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala index 47581d7ad0a59..98331a0472dc8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalLegacyTableSourceScan.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.logical import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan.isTableSourceScan import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, LegacyTableSourceTable} +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import org.apache.flink.table.sources._ import com.google.common.collect.ImmutableList @@ -34,7 +35,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter} import java.util -import java.util.Collections import java.util.function.Supplier /** @@ -44,8 +44,9 @@ import java.util.function.Supplier class FlinkLogicalLegacyTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], relOptTable: LegacyTableSourceTable[_]) - extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable) + extends TableScan(cluster, traitSet, hints, relOptTable) with FlinkLogicalRel { lazy val tableSource: TableSource[_] = tableSourceTable.tableSource @@ -55,11 +56,11 @@ class FlinkLogicalLegacyTableSourceScan( def copy( traitSet: RelTraitSet, tableSourceTable: LegacyTableSourceTable[_]): FlinkLogicalLegacyTableSourceScan = { - new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable) + new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, relOptTable) + new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, getHints, relOptTable) } override def deriveRowType(): RelDataType = { @@ -77,6 +78,7 @@ class FlinkLogicalLegacyTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty) } } @@ -96,7 +98,7 @@ class FlinkLogicalLegacyTableSourceScanConverter def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[TableScan] val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase] - FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, table) + FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table) } } @@ -108,8 +110,10 @@ object FlinkLogicalLegacyTableSourceScan { tableSourceTable != null } - def create(cluster: RelOptCluster, relOptTable: FlinkPreparingTableBase) - : FlinkLogicalLegacyTableSourceScan = { + def create( + cluster: RelOptCluster, + hints: util.List[RelHint], + relOptTable: FlinkPreparingTableBase): FlinkLogicalLegacyTableSourceScan = { val table = relOptTable.unwrap(classOf[LegacyTableSourceTable[_]]) val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs( RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() { @@ -121,6 +125,6 @@ object FlinkLogicalLegacyTableSourceScan { } } }).simplify() - new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, table) + new FlinkLogicalLegacyTableSourceScan(cluster, traitSet, hints, table) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index 697e4575989a1..1119ed0e8a69e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -22,6 +22,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan import org.apache.flink.table.planner.plan.schema.TableSourceTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ @@ -34,7 +35,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter} import java.util -import java.util.Collections import java.util.function.Supplier import scala.collection.JavaConversions._ @@ -46,8 +46,9 @@ import scala.collection.JavaConversions._ class FlinkLogicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], relOptTable: TableSourceTable) - extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable) + extends TableScan(cluster, traitSet, hints, relOptTable) with FlinkLogicalRel { lazy val tableSource: DynamicTableSource = relOptTable.tableSource @@ -55,11 +56,11 @@ class FlinkLogicalTableSourceScan( def copy( traitSet: RelTraitSet, tableSourceTable: TableSourceTable): FlinkLogicalTableSourceScan = { - new FlinkLogicalTableSourceScan(cluster, traitSet, tableSourceTable) + new FlinkLogicalTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable) + new FlinkLogicalTableSourceScan(cluster, traitSet, getHints, relOptTable) } override def deriveRowType(): RelDataType = { @@ -77,6 +78,7 @@ class FlinkLogicalTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", getRowType.getFieldNames.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty) } } @@ -96,7 +98,7 @@ class FlinkLogicalTableSourceScanConverter def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[TableScan] val table = scan.getTable.unwrap(classOf[TableSourceTable]) - FlinkLogicalTableSourceScan.create(rel.getCluster, table) + FlinkLogicalTableSourceScan.create(rel.getCluster, scan.getHints, table) } } @@ -108,7 +110,10 @@ object FlinkLogicalTableSourceScan { tableSourceTable != null } - def create(cluster: RelOptCluster, table: TableSourceTable): FlinkLogicalTableSourceScan = { + def create( + cluster: RelOptCluster, + hints: util.List[RelHint], + table: TableSourceTable): FlinkLogicalTableSourceScan = { val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs( RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() { def get: util.List[RelCollation] = { @@ -119,6 +124,6 @@ object FlinkLogicalTableSourceScan { } } }).simplify() - new FlinkLogicalTableSourceScan(cluster, traitSet, table) + new FlinkLogicalTableSourceScan(cluster, traitSet, hints, table) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalBoundedStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalBoundedStreamScan.scala index 71e1e063c6e3c..12282936c2f0b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalBoundedStreamScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalBoundedStreamScan.scala @@ -22,13 +22,17 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecBoundedStreamScan import org.apache.flink.table.planner.plan.schema.DataStreamTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} +import java.util + import scala.collection.JavaConverters._ /** @@ -39,9 +43,10 @@ import scala.collection.JavaConverters._ class BatchPhysicalBoundedStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], table: RelOptTable, outputRowType: RelDataType) - extends TableScan(cluster, traitSet, table) + extends TableScan(cluster, traitSet, hints, table) with BatchPhysicalRel { val boundedStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) @@ -49,7 +54,7 @@ class BatchPhysicalBoundedStreamScan( override def deriveRowType(): RelDataType = outputRowType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new BatchPhysicalBoundedStreamScan(cluster, traitSet, getTable, getRowType) + new BatchPhysicalBoundedStreamScan(cluster, traitSet, getHints, getTable, getRowType) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { @@ -61,6 +66,7 @@ class BatchPhysicalBoundedStreamScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !(getHints.isEmpty)); } override def translateToExecNode(): ExecNode[_] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLegacyTableSourceScan.scala index 85d2f22845450..8778505f8a1ab 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLegacyTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalLegacyTableSourceScan.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.sources.StreamTableSource import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import java.util @@ -38,12 +39,13 @@ import java.util class BatchPhysicalLegacyTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], tableSourceTable: LegacyTableSourceTable[_]) - extends CommonPhysicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable) + extends CommonPhysicalLegacyTableSourceScan(cluster, traitSet, hints, tableSourceTable) with BatchPhysicalRel { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchPhysicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable) + new BatchPhysicalLegacyTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala index 9509edc33ef67..3021002c83449 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalTableSourceScan.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import java.util @@ -39,12 +40,13 @@ import java.util class BatchPhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], tableSourceTable: TableSourceTable) - extends CommonPhysicalTableSourceScan(cluster, traitSet, tableSourceTable) + extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable) with BatchPhysicalRel { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new BatchPhysicalTableSourceScan(cluster, traitSet, tableSourceTable) + new BatchPhysicalTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLegacyTableSourceScan.scala index 47f9b6bde22ab..ad38ae751de99 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLegacyTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLegacyTableSourceScan.scala @@ -20,12 +20,16 @@ package org.apache.flink.table.planner.plan.nodes.physical.common import org.apache.flink.api.dag.Transformation import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import org.apache.flink.table.sources.TableSource import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint + +import java.util import scala.collection.JavaConverters._ @@ -35,8 +39,9 @@ import scala.collection.JavaConverters._ abstract class CommonPhysicalLegacyTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], relOptTable: LegacyTableSourceTable[_]) - extends TableScan(cluster, traitSet, relOptTable) { + extends TableScan(cluster, traitSet, hints, relOptTable) { // cache table source transformation. protected var sourceTransform: Transformation[_] = _ @@ -55,5 +60,6 @@ abstract class CommonPhysicalLegacyTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala index 2dce72e3f865a..cd8704d6b7b56 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalTableSourceScan.scala @@ -20,11 +20,15 @@ package org.apache.flink.table.planner.plan.nodes.physical.common import org.apache.flink.table.connector.source.ScanTableSource import org.apache.flink.table.planner.plan.schema.TableSourceTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint + +import java.util import scala.collection.JavaConverters._ @@ -34,8 +38,9 @@ import scala.collection.JavaConverters._ abstract class CommonPhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], relOptTable: TableSourceTable) - extends TableScan(cluster, traitSet, relOptTable) { + extends TableScan(cluster, traitSet, hints, relOptTable) { protected val tableSourceTable: TableSourceTable = relOptTable.unwrap(classOf[TableSourceTable]) @@ -50,5 +55,6 @@ abstract class CommonPhysicalTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw).item("fields", getRowType.getFieldNames.asScala.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDataStreamScan.scala index b7b45a459cc56..c80cc0d40fbbb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDataStreamScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDataStreamScan.scala @@ -22,13 +22,17 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.ExecNode import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDataStreamScan import org.apache.flink.table.planner.plan.schema.DataStreamTable +import org.apache.flink.table.planner.plan.utils.RelExplainUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} +import java.util + import scala.collection.JavaConverters._ /** @@ -39,9 +43,10 @@ import scala.collection.JavaConverters._ class StreamPhysicalDataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], table: RelOptTable, outputRowType: RelDataType) - extends TableScan(cluster, traitSet, table) + extends TableScan(cluster, traitSet, hints, table) with StreamPhysicalRel { val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) @@ -51,7 +56,7 @@ class StreamPhysicalDataStreamScan( override def deriveRowType(): RelDataType = outputRowType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new StreamPhysicalDataStreamScan(cluster, traitSet, getTable, getRowType) + new StreamPhysicalDataStreamScan(cluster, traitSet, getHints, getTable, getRowType) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { @@ -63,6 +68,7 @@ class StreamPhysicalDataStreamScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) + .itemIf("hints", RelExplainUtil.hintsToString(getHints), !getHints.isEmpty) } override def translateToExecNode(): ExecNode[_] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLegacyTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLegacyTableSourceScan.scala index 2642aa831bc44..39ae30021508e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLegacyTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLegacyTableSourceScan.scala @@ -27,22 +27,26 @@ import org.apache.flink.table.sources.StreamTableSource import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery +import java.util + /** * Stream physical RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamPhysicalLegacyTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], tableSourceTable: LegacyTableSourceTable[_]) - extends CommonPhysicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable) + extends CommonPhysicalLegacyTableSourceScan(cluster, traitSet, hints, tableSourceTable) with StreamPhysicalRel { override def requireWatermark: Boolean = false override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new StreamPhysicalLegacyTableSourceScan(cluster, traitSet, tableSourceTable) + new StreamPhysicalLegacyTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala index d5d49167b773a..992547576f66a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTableSourceScan.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.metadata.RelMetadataQuery import java.util @@ -39,14 +40,15 @@ import java.util class StreamPhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, + hints: util.List[RelHint], tableSourceTable: TableSourceTable) - extends CommonPhysicalTableSourceScan(cluster, traitSet, tableSourceTable) + extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable) with StreamPhysicalRel { override def requireWatermark: Boolean = false override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new StreamPhysicalTableSourceScan(cluster, traitSet, tableSourceTable) + new StreamPhysicalTableSourceScan(cluster, traitSet, getHints, tableSourceTable) } override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala index 9113f5238d3fe..b4389782f30d4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalBoundedStreamScanRule.scala @@ -49,7 +49,12 @@ class BatchPhysicalBoundedStreamScanRule def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[FlinkLogicalDataStreamTableScan] val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) - new BatchPhysicalBoundedStreamScan(rel.getCluster, newTrait, scan.getTable, rel.getRowType) + new BatchPhysicalBoundedStreamScan( + rel.getCluster, + newTrait, + scan.getHints, + scan.getTable, + rel.getRowType) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala index 65b4a4110a696..2cb6ce3c73776 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacyTableSourceScanRule.scala @@ -60,6 +60,7 @@ class BatchPhysicalLegacyTableSourceScanRule new BatchPhysicalLegacyTableSourceScan( rel.getCluster, newTrait, + scan.getHints, scan.getTable.asInstanceOf[LegacyTableSourceTable[_]] ) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala index 2ce9a1da2458a..037251123a62a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalTableSourceScanRule.scala @@ -61,6 +61,7 @@ class BatchPhysicalTableSourceScanRule new BatchPhysicalTableSourceScan( rel.getCluster, newTrait, + scan.getHints, scan.getTable.asInstanceOf[TableSourceTable] ) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala index 2621fda5fa5f1..4d7d1ffceffc5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDataStreamScanRule.scala @@ -50,6 +50,7 @@ class StreamPhysicalDataStreamScanRule new StreamPhysicalDataStreamScan( rel.getCluster, traitSet, + scan.getHints, scan.getTable, rel.getRowType ) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala index 580f8b7b1f6a0..3173f703971fc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacyTableSourceScanRule.scala @@ -61,6 +61,7 @@ class StreamPhysicalLegacyTableSourceScanRule new StreamPhysicalLegacyTableSourceScan( rel.getCluster, traitSet, + scan.getHints, scan.getTable.asInstanceOf[LegacyTableSourceTable[_]] ) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala index d2c81529973fa..56a8ec7d973d4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala @@ -69,6 +69,7 @@ class StreamPhysicalTableSourceScanRule val newScan = new StreamPhysicalTableSourceScan( rel.getCluster, traitSet, + scan.getHints, table) if (isUpsertSource(table.catalogTable, table.tableSource) || diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala index fad78bf8936f9..4d65de021731f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala @@ -25,10 +25,11 @@ import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregat import org.apache.flink.table.planner.plan.nodes.ExpressionFormat import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat -import com.google.common.collect.ImmutableMap +import com.google.common.collect.{ImmutableList, ImmutableMap} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.{AggregateCall, Window} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.{RelCollation, RelWriter} import org.apache.calcite.rex._ import org.apache.calcite.sql.SqlKind @@ -858,4 +859,28 @@ object RelExplainUtil { case (k, v) => s"$k = (${v.mkString(", ")})" }.mkString(", ") + /** + * Converts [[RelHint]]s to String. + */ + def hintsToString(hints: ImmutableList[RelHint]): String = { + val sb = new StringBuilder + sb.append("[") + hints.foreach { hint => + sb.append("[").append(hint.hintName) + if (!hint.inheritPath.isEmpty) { + sb.append(" inheritPath:").append(hint.inheritPath) + } + if (hint.listOptions.size() > 0 || hint.kvOptions.size() > 0) { + sb.append(" options:") + if (hint.listOptions.size > 0) { + sb.append(hint.listOptions.toString) + } else { + sb.append(hint.kvOptions.toString) + } + } + sb.append("]") + } + sb.append("]") + sb.toString + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index ee57e9f8f81a0..ea90ba6040899 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -284,6 +284,11 @@ private static RowKind parseRowKind(String rowKindShortString) { .booleanType() .defaultValue(false) .withDeprecatedKeys("Option to determine whether to discard the late event."); + private static final ConfigOption SOURCE_NUM_ELEMENT_TO_SKIP = + ConfigOptions.key("source.num-element-to-skip") + .intType() + .defaultValue(-1) + .withDeprecatedKeys("Option to define the number of elements to skip."); /** * Parse partition list from Options with the format as @@ -319,6 +324,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean nestedProjectionSupported = helper.getOptions().get(NESTED_PROJECTION_SUPPORTED); boolean enableWatermarkPushDown = helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN); boolean failingSource = helper.getOptions().get(FAILING_SOURCE); + int numElementToSkip = helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP); Optional> filterableFields = helper.getOptions().getOptional(FILTERABLE_FIELDS); @@ -360,6 +366,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { null, Collections.emptyList(), filterableFieldsSet, + numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, @@ -376,6 +383,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { null, Collections.emptyList(), filterableFieldsSet, + numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, @@ -395,6 +403,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { null, Collections.emptyList(), filterableFieldsSet, + numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, @@ -495,7 +504,8 @@ public Set> optionalOptions() { SINK_CHANGELOG_MODE_ENFORCED, WRITABLE_METADATA, ENABLE_WATERMARK_PUSH_DOWN, - SINK_DROP_LATE_EVENT)); + SINK_DROP_LATE_EVENT, + SOURCE_NUM_ELEMENT_TO_SKIP)); } private static int validateAndExtractRowtimeIndex( @@ -644,6 +654,7 @@ private static class TestValuesScanTableSource protected List filterPredicates; protected final Set filterableFields; protected long limit; + protected int numElementToSkip; protected List> allPartitions; protected final Map readableMetadata; protected @Nullable int[] projectedMetadataFields; @@ -659,6 +670,7 @@ private TestValuesScanTableSource( @Nullable int[][] projectedPhysicalFields, List filterPredicates, Set filterableFields, + int numElementToSkip, long limit, List> allPartitions, Map readableMetadata, @@ -673,6 +685,7 @@ private TestValuesScanTableSource( this.projectedPhysicalFields = projectedPhysicalFields; this.filterPredicates = filterPredicates; this.filterableFields = filterableFields; + this.numElementToSkip = numElementToSkip; this.limit = limit; this.allPartitions = allPartitions; this.readableMetadata = readableMetadata; @@ -790,6 +803,7 @@ public DynamicTableSource copy() { projectedPhysicalFields, filterPredicates, filterableFields, + numElementToSkip, limit, allPartitions, readableMetadata, @@ -807,6 +821,7 @@ protected Collection convertToRowData(DataStructureConverter converter) allPartitions.isEmpty() ? Collections.singletonList(Collections.emptyMap()) : allPartitions; + int numRetained = 0; for (Map partition : keys) { for (Row row : data.get(partition)) { if (result.size() >= limit) { @@ -819,8 +834,11 @@ protected Collection convertToRowData(DataStructureConverter converter) final Row projectedRow = projectRow(row); final RowData rowData = (RowData) converter.toInternal(projectedRow); if (rowData != null) { - rowData.setRowKind(row.getKind()); - result.add(rowData); + if (numRetained >= numElementToSkip) { + rowData.setRowKind(row.getKind()); + result.add(rowData); + } + numRetained++; } } } @@ -928,6 +946,7 @@ private TestValuesScanTableSourceWithWatermarkPushDown( @Nullable int[][] projectedPhysicalFields, List filterPredicates, Set filterableFields, + int numElementToSkip, long limit, List> allPartitions, Map readableMetadata, @@ -943,6 +962,7 @@ private TestValuesScanTableSourceWithWatermarkPushDown( projectedPhysicalFields, filterPredicates, filterableFields, + numElementToSkip, limit, allPartitions, readableMetadata, @@ -994,6 +1014,7 @@ public DynamicTableSource copy() { projectedPhysicalFields, filterPredicates, filterableFields, + numElementToSkip, limit, allPartitions, readableMetadata, @@ -1028,6 +1049,7 @@ private TestValuesScanLookupTableSource( int[][] projectedFields, List filterPredicates, Set filterableFields, + int numElementToSkip, long limit, List> allPartitions, Map readableMetadata, @@ -1043,6 +1065,7 @@ private TestValuesScanLookupTableSource( projectedFields, filterPredicates, filterableFields, + numElementToSkip, limit, allPartitions, readableMetadata, @@ -1080,7 +1103,17 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { allPartitions.forEach( key -> rows.addAll(data.getOrDefault(key, new ArrayList<>()))); } - rows.forEach( + + List data = new ArrayList<>(rows); + if (numElementToSkip > 0) { + if (numElementToSkip >= data.size()) { + data = Collections.EMPTY_LIST; + } else { + data = data.subList(numElementToSkip, data.size()); + } + } + + data.forEach( record -> { Row key = Row.of( @@ -1118,6 +1151,7 @@ public DynamicTableSource copy() { projectedPhysicalFields, filterPredicates, filterableFields, + numElementToSkip, limit, allPartitions, readableMetadata, diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.xml index 36bcf5a65727b..e9738e30eec18 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.xml @@ -254,6 +254,22 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, ProjectableTable, source: [TestSource(physical fields: )]]], fields=[]) +]]> + + + + + + + + + + + @@ -280,19 +296,27 @@ Calc(select=[id]) ]]> - - - - + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml index a6db72296d9ee..fb60f04ca8954 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml @@ -117,6 +117,33 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +- TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[]]], fields=[]) +]]> + + + + + + + + @@ -134,6 +161,52 @@ LogicalProject(a=[$0], c=[$2], EXPR$2=[PROCTIME()]) + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml index 18ebe8f7440df..c7bf2f397cf0f 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml @@ -407,35 +407,6 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id]) +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, id]) +- Calc(select=[a, b, c, PROCTIME() AS proctime]) +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c]) -]]> - - - - - 1000 - ]]> - - - ($2, 1000)]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) - :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()]) - : +- LogicalTableScan(table=[[default_catalog, default_database, T0]]) - +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) -]]> - - - 1000)]) - +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c]) ]]> @@ -463,7 +434,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age] ]]> - + @@ -501,39 +472,55 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3= +- FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner]) :- FlinkLogicalCalc(select=[b, a, c, d]) : +- FlinkLogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)]) - : +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]]) + : +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[id], where=[>(age, 10)]) - +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], fields=[id, name, age]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) ]]> - + 1000 +ON T.a = D.id +WHERE D.age > 10 + ) AS T +GROUP BY b ]]> ($2, 1000)]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}]) - :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()]) - : +- LogicalTableScan(table=[[default_catalog, default_database, T0]]) - +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]]) +LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)]) ++- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3]) + +- LogicalFilter(condition=[>($7, 10)]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}]) + :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()]) + : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalFilter(condition=[=($cor0.a, $0)]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]]) ]]> - + 1000)]) - +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c]) +FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)]) ++- FlinkLogicalCalc(select=[b, a, c, d]) + +- FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner]) + :- FlinkLogicalCalc(select=[b, a, c, d]) + : +- FlinkLogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)]) + : +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d]) + +- FlinkLogicalSnapshot(period=[$cor0.proctime]) + +- FlinkLogicalCalc(select=[id], where=[>(age, 10)]) + +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], fields=[id, name, age]) ]]> @@ -610,51 +597,6 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[id AS a, b]) +- Reused(reference_id=[1]) -]]> - - - - - 10 - ) AS T -GROUP BY b - ]]> - - - ($7, 10)]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}]) - :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()]) - : +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalFilter(condition=[=($cor0.a, $0)]) - +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) -]]> - - - (age, 10)]) - +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/hint/OptionsHintTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/hint/OptionsHintTest.xml index f43b8bdcd5985..fd4e91ca7ed9e 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/hint/OptionsHintTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/hint/OptionsHintTest.xml @@ -23,13 +23,13 @@ limitations under the License. @@ -40,13 +40,13 @@ Calc(select=[a, b, (a + 1) AS c]) @@ -88,17 +88,17 @@ on t1.a = t2.d LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]]) - +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]], hints=[[[OPTIONS inheritPath:[] options:{a.b.c=fakeVal, k5=v5}]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]], hints=[[[OPTIONS inheritPath:[] options:{d.e.f=fakeVal, k6=v6}]]]) ]]> @@ -117,8 +117,8 @@ on t1.a = t2.d LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]]) - +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]], hints=[[[OPTIONS inheritPath:[] options:{a.b.c=fakeVal, k5=v5}]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]], hints=[[[OPTIONS inheritPath:[] options:{d.e.f=fakeVal, k6=v6}]]]) ]]> @@ -126,9 +126,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, (a + 1) AS c]) -: +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]], fields=[a, b]) +: +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=v1, k2=v2, a.b.c=fakeVal, k5=v5})], dynamic options: {a.b.c=fakeVal, k5=v5}]], fields=[a, b], hints=[[[OPTIONS options:{a.b.c=fakeVal, k5=v5}]]]) +- Exchange(distribution=[hash[d]]) - +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]], fields=[d, e, f]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={d.e.f=fakeVal, k3=v3, k4=v4, k6=v6})], dynamic options: {d.e.f=fakeVal, k6=v6}]], fields=[d, e, f], hints=[[[OPTIONS options:{d.e.f=fakeVal, k6=v6}]]]) ]]> @@ -147,17 +147,17 @@ on t1.a = t2.d LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]]) - +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]], hints=[[[OPTIONS inheritPath:[] options:{k1=#v1, k2=#v2}]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]], hints=[[[OPTIONS inheritPath:[] options:{k3=#v3, k4=#v4}]]]) ]]> @@ -176,8 +176,8 @@ on t1.a = t2.d LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- LogicalProject(a=[$0], b=[$1], c=[+($0, 1)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]]) - +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]], hints=[[[OPTIONS inheritPath:[] options:{k1=#v1, k2=#v2}]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]], hints=[[[OPTIONS inheritPath:[] options:{k3=#v3, k4=#v4}]]]) ]]> @@ -185,9 +185,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5]) Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, (a + 1) AS c]) -: +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]], fields=[a, b]) +: +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, source: [OptionsTableSource(props={k1=#v1, k2=#v2})], dynamic options: {k1=#v1, k2=#v2}]], fields=[a, b], hints=[[[OPTIONS options:{k1=#v1, k2=#v2}]]]) +- Exchange(distribution=[hash[d]]) - +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]], fields=[d, e, f]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [OptionsTableSource(props={k3=#v3, k4=#v4})], dynamic options: {k3=#v3, k4=#v4}]], fields=[d, e, f], hints=[[[OPTIONS options:{k3=#v3, k4=#v4}]]]) ]]> @@ -222,13 +222,13 @@ Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInput @@ -286,13 +286,13 @@ Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInput diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRuleTest.xml index 0f1ea7a65ce87..282ffd96762bf 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRuleTest.xml @@ -71,7 +71,7 @@ LogicalProject(rank_num=[$4], rowtime=[$3], a=[$0], rank_num0=[$4], a0=[$0], ran FlinkLogicalCalc(select=[rank_num1 AS rank_num, rowtime, a, rank_num1 AS rank_num0, a AS a0, rank_num1]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, rank_num1]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -98,7 +98,7 @@ LogicalProject(a=[$0]) FlinkLogicalCalc(select=[a]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -125,7 +125,7 @@ LogicalProject(rowtime=[$3]) FlinkLogicalCalc(select=[rowtime]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -152,7 +152,7 @@ LogicalProject(rowtime=[$3], c=[$2]) FlinkLogicalCalc(select=[rowtime, c], where=[>(a, 10)]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, c, rowtime]) +- FlinkLogicalCalc(select=[a, c, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -178,7 +178,7 @@ LogicalProject(a=[$0], rowtime=[$3]) @@ -270,7 +270,7 @@ LogicalProject(a=[$0], rowtime=[$3], rank_num=[$4]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml index 80d3e38f05e34..bd4ed0a690a71 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml @@ -39,7 +39,7 @@ LogicalProject(a=[$0], rank_num=[$4]) FlinkLogicalCalc(select=[a, w0$o0]) +- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -66,7 +66,7 @@ LogicalProject(a=[$0], rank_num=[$4]) FlinkLogicalCalc(select=[a, w0$o0]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -93,7 +93,7 @@ LogicalProject(a=[$0]) FlinkLogicalCalc(select=[a]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> @@ -120,7 +120,7 @@ LogicalProject(a=[$0], rank_num=[$4]) FlinkLogicalCalc(select=[a, 1:BIGINT AS w0$o0]) +- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime]) +- FlinkLogicalCalc(select=[a, rowtime]) - +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]]) + +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.scala index 3faa0d20f497a..33d9c6e905eb9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LegacyTableSourceTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.batch.sql import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.api.{DataTypes, TableSchema, Types, ValidationException} import org.apache.flink.table.planner.expressions.utils.Func1 @@ -212,4 +213,49 @@ class LegacyTableSourceTest extends TableTestBase { """.stripMargin util.verifyExecPlan(sqlQuery) } + + @Test + def testTableHint(): Unit = { + util.tableEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + val ddl = + s""" + |CREATE TABLE MyTable1 ( + | name STRING, + | a bigint, + | b int, + | c double + |) with ( + | 'connector.type' = 'TestFilterableSource', + | 'is-bounded' = 'true' + |) + """.stripMargin + util.tableEnv.executeSql(ddl) + util.tableEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` BIGINT, + | `b` INT, + | `c` DOUBLE + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '/tmp/test' + |) + """.stripMargin) + + val stmtSet = util.tableEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable1 + | /*+ OPTIONS('source.num-element-to-skip'='31') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable1 + | /*+ OPTIONS('source.num-element-to-skip'='32') */ + |""".stripMargin) + + util.verifyExecPlan(stmtSet) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala index 4a0841df8d04f..3bd19e2b3f567 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.plan.batch.sql +import org.apache.flink.table.api.config.TableConfigOptions +import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.utils._ import org.junit.{Before, Test} @@ -89,6 +91,17 @@ class TableSourceTest extends TableTestBase { |) |""".stripMargin util.tableEnv.executeSql(ddl4) + util.tableEnv.executeSql( + s""" + |CREATE TABLE MyTable ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'values', + | 'bounded' = 'true' + |) + |""".stripMargin) } @Test @@ -144,4 +157,104 @@ class TableSourceTest extends TableTestBase { |""".stripMargin ) } + + @Test + def testTableHintWithDifferentOptions(): Unit = { + util.tableEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + util.tableEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '/tmp/test' + |) + """.stripMargin) + + val stmtSet = util.tableEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='2') */ + |""".stripMargin) + + util.verifyExecPlan(stmtSet) + } + + @Test + def testTableHintWithSameOptions(): Unit = { + util.tableEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + util.tableEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '/tmp/test' + |) + """.stripMargin) + + val stmtSet = util.tableEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + + util.verifyExecPlan(stmtSet) + } + + @Test + def testTableHintWithDigestReuseForLogicalTableScan(): Unit = { + util.tableEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + util.tableEnv.getConfig.getConfiguration.setBoolean( + RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true) + util.tableEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '/tmp/test' + |) + """.stripMargin) + + val stmtSet = util.tableEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink + |select a,b,c from MyTable /*+ OPTIONS('source.num-element-to-skip'='0') */ + |union all + |select a,b,c from MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='2') */ + |""".stripMargin) + + util.verifyExecPlan(stmtSet) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index d3a524a7f2e35..bf91ebf20e280 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -56,6 +56,7 @@ import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel._ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.core._ +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical._ import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery, RelMetadataQueryBase} import org.apache.calcite.rex._ @@ -71,6 +72,7 @@ import org.junit.{Before, BeforeClass} import java.math.BigDecimal import java.util +import java.util.Collections import scala.collection.JavaConversions._ @@ -2358,11 +2360,14 @@ class FlinkRelMdHandlerTestBase { val scan = relBuilder.scan(tableNames).build() scan.copy(traitSet, scan.getInputs) case FlinkConventions.LOGICAL => - new FlinkLogicalDataStreamTableScan(cluster, traitSet, table) + new FlinkLogicalDataStreamTableScan( + cluster, traitSet, Collections.emptyList[RelHint](), table) case FlinkConventions.BATCH_PHYSICAL => - new BatchPhysicalBoundedStreamScan(cluster, traitSet, table, table.getRowType) + new BatchPhysicalBoundedStreamScan( + cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType) case FlinkConventions.STREAM_PHYSICAL => - new StreamPhysicalDataStreamScan(cluster, traitSet, table, table.getRowType) + new StreamPhysicalDataStreamScan( + cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType) case _ => throw new TableException(s"Unsupported convention trait: $conventionTrait") } scan.asInstanceOf[T] diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala index e17b37d1e2566..9ebee0dd9fa6b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala @@ -25,11 +25,14 @@ import org.apache.flink.table.planner.plan.utils.ExpandUtil import com.google.common.collect.{ImmutableList, ImmutableSet} import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, LESS_THAN} import org.apache.calcite.util.ImmutableBitSet import org.junit.Assert._ import org.junit.Test +import java.util.Collections + import scala.collection.JavaConversions._ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { @@ -52,6 +55,7 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { val tableSourceScan = new StreamPhysicalTableSourceScan( cluster, streamPhysicalTraits, + Collections.emptyList[RelHint](), table) assertEquals(uniqueKeys(Array(0, 2)), mq.getUniqueKeys(tableSourceScan).toSet) } @@ -66,6 +70,7 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase { val tableSourceScan = new StreamPhysicalTableSourceScan( cluster, streamPhysicalTraits, + Collections.emptyList[RelHint](), table) assertNull(mq.getUniqueKeys(tableSourceScan)) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala index 582d0c8d7e70a..4e73cc87784f1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LegacyTableSourceITCase.scala @@ -20,16 +20,17 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentInternal import org.apache.flink.table.api.{DataTypes, TableSchema, Types} import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} -import org.apache.flink.table.planner.utils.{TestDataTypeTableSource, TestFileInputFormatTableSource, TestLegacyFilterableTableSource, TestInputFormatTableSource, TestNestedProjectableTableSource, TestPartitionableSourceFactory, TestLegacyProjectableTableSource, TestTableSourceSinks} +import org.apache.flink.table.planner.utils.{TableTestUtil, TestDataTypeTableSource, TestFileInputFormatTableSource, TestInputFormatTableSource, TestLegacyFilterableTableSource, TestLegacyProjectableTableSource, TestNestedProjectableTableSource, TestPartitionableSourceFactory, TestTableSourceSinks} import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter import org.apache.flink.types.Row -import org.junit.{Before, Test} +import org.junit.{Assert, Before, Test} import java.io.FileWriter import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} @@ -332,4 +333,53 @@ class LegacyTableSourceITCase extends BatchTestBase { ) ) } + + @Test + def testTableHint(): Unit = { + tEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + val ddl = + s""" + |CREATE TABLE MyTable1 ( + | name STRING, + | a bigint, + | b int, + | c double + |) with ( + | 'connector.type' = 'TestFilterableSource', + | 'is-bounded' = 'true' + |) + """.stripMargin + tEnv.executeSql(ddl) + val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath + tEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` BIGINT, + | `b` INT, + | `c` DOUBLE + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '$resultPath' + |) + """.stripMargin) + + val stmtSet= tEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable1 + | /*+ OPTIONS('source.num-element-to-skip'='31') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable1 + | /*+ OPTIONS('source.num-element-to-skip'='32') */ + |""".stripMargin) + stmtSet.execute().await() + + val result = TableTestUtil.readFromFile(resultPath) + val expected = Seq("31,31,31.0", "32,32,32.0", "32,32,32.0") + Assert.assertEquals(expected.sorted, result.sorted) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala index 631a3ca789831..410f0034756bf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala @@ -18,14 +18,16 @@ package org.apache.flink.table.planner.runtime.batch.sql +import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData} import org.apache.flink.table.planner.utils._ import org.apache.flink.util.FileUtils -import org.junit.{Before, Test} +import org.junit.{Assert, Before, Test} class TableSourceITCase extends BatchTestBase { @@ -303,4 +305,81 @@ class TableSourceITCase extends BatchTestBase { row("6")) ) } + + @Test + def testTableHint(): Unit = { + tEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath + tEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '$resultPath' + |) + """.stripMargin) + + val stmtSet= tEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='2') */ + |""".stripMargin) + stmtSet.execute().await() + + val result = TableTestUtil.readFromFile(resultPath) + val expected = Seq("2,2,Hello", "3,2,Hello world", "3,2,Hello world") + Assert.assertEquals(expected.sorted, result.sorted) + } + + @Test + def testTableHintWithLogicalTableScanReuse(): Unit = { + tEnv.getConfig.getConfiguration.setBoolean( + TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true) + tEnv.getConfig.getConfiguration.setBoolean( + RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, true) + val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath + tEnv.executeSql( + s""" + |CREATE TABLE MySink ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '$resultPath' + |) + """.stripMargin) + + val stmtSet = tEnv.createStatementSet() + stmtSet.addInsertSql( + """ + |insert into MySink + |select a,b,c from MyTable /*+ OPTIONS('source.num-element-to-skip'='0') */ + |union all + |select a,b,c from MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ + |""".stripMargin) + stmtSet.addInsertSql( + """ + |insert into MySink select a,b,c from MyTable + | /*+ OPTIONS('source.num-element-to-skip'='2') */ + |""".stripMargin) + stmtSet.execute().await() + + val result = TableTestUtil.readFromFile(resultPath) + val expected = Seq( + "1,1,Hi", "2,2,Hello", "2,2,Hello", "3,2,Hello world", "3,2,Hello world", "3,2,Hello world") + Assert.assertEquals(expected.sorted, result.sorted) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 73477db9b384a..14b696973dbfa 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1649,8 +1649,17 @@ object TableTestUtil { str } - def readFromResourceAndRemoveLastLinkBreak(path: String): String = { - readFromResource(path).stripSuffix("\n") + def readFromFile(path: String): Seq[String] = { + val file = new File(path) + if (file.isDirectory) { + file.listFiles().foldLeft(Seq.empty[String]) { + (lines, p) => lines ++ readFromFile(p.getAbsolutePath) + } + } else if (file.isHidden) { + Seq.empty[String] + } else { + Files.readAllLines(Paths.get(file.toURI)).toSeq + } } @throws[IOException] diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index 1ff8c3899400e..ba871db27c18e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -48,8 +48,8 @@ import org.apache.flink.table.sources._ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks} import org.apache.flink.table.types.DataType +import org.apache.flink.table.utils.EncodingUtils import org.apache.flink.table.utils.TableSchemaUtils.getPhysicalSchema -import org.apache.flink.table.utils.{EncodingUtils} import org.apache.flink.types.Row import _root_.java.io.{File, FileOutputStream, OutputStreamWriter} @@ -495,13 +495,24 @@ class TestLegacyFilterableTableSource( data: Seq[Row], filterableFields: Set[String] = Set(), filterPredicates: Seq[Expression] = Seq(), - val filterPushedDown: Boolean = false) + val filterPushedDown: Boolean = false, + val numElementToSkip: Int = -1) extends StreamTableSource[Row] with FilterableTableSource[Row] { override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { + val records = if (numElementToSkip > 0) { + if (numElementToSkip >= data.size) { + Seq.empty[Row] + } else { + data.slice(numElementToSkip, data.size) + } + } else { + data + } + execEnv.fromCollection[Row]( - applyPredicatesToRows(data).asJava, + applyPredicatesToRows(records).asJava, fromDataTypeToTypeInfo(getProducedDataType).asInstanceOf[RowTypeInfo]) .setParallelism(1).setMaxParallelism(1) } @@ -682,6 +693,8 @@ class TestLegacyFilterableTableSourceFactory extends StreamTableSourceFactory[Ro val isBounded = descriptorProps.getOptionalBoolean("is-bounded").orElse(false) val schema = descriptorProps.getTableSchema(Schema.SCHEMA) val serializedRows = descriptorProps.getOptionalString("data").orElse(null) + val numElementToSkip: Int = + descriptorProps.getOptionalInt("source.num-element-to-skip").orElse(-1) val rows = if (serializedRows != null) { EncodingUtils.decodeStringToObject(serializedRows, classOf[List[Row]]) } else { @@ -694,7 +707,12 @@ class TestLegacyFilterableTableSourceFactory extends StreamTableSourceFactory[Ro } else { TestLegacyFilterableTableSource.defaultFilterableFields } - new TestLegacyFilterableTableSource(isBounded, schema, rows, filterableFields) + new TestLegacyFilterableTableSource( + isBounded, + schema, + rows, + filterableFields, + numElementToSkip = numElementToSkip) } override def requiredContext(): JMap[String, String] = { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java new file mode 100644 index 0000000000000..4de2003afeda7 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.calcite.rel.logical; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.schema.Table; + +import java.util.List; + +/** + * A LogicalTableScan reads all the rows from a {@link RelOptTable}. + * + *

This class is copied from Calcite because the {@link #explainTerms} should consider hints. + * + *

If the table is a net.sf.saffron.ext.JdbcTable, then this is literally possible. + * But for other kinds of tables, there may be many ways to read the data from the table. For some + * kinds of table, it may not even be possible to read all of the rows unless some narrowing + * constraint is applied. + * + *

In the example of the net.sf.saffron.ext.ReflectSchema schema, + * + *

+ * + *
select from fields
+ * + *
+ * + *

cannot be implemented, but + * + *

+ * + *
select from fields as f
+ * where f.getClass().getName().equals("java.lang.String")
+ * + *
+ * + *

can. It is the optimizer's responsibility to find these ways, by applying transformation + * rules. + */ +public final class LogicalTableScan extends TableScan { + // ~ Constructors ----------------------------------------------------------- + + /** + * Creates a LogicalTableScan. + * + *

Use {@link #create} unless you know what you're doing. + */ + public LogicalTableScan( + RelOptCluster cluster, RelTraitSet traitSet, List hints, RelOptTable table) { + super(cluster, traitSet, hints, table); + } + + @Deprecated // to be removed before 2.0 + public LogicalTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) { + this(cluster, traitSet, ImmutableList.of(), table); + } + + @Deprecated // to be removed before 2.0 + public LogicalTableScan(RelOptCluster cluster, RelOptTable table) { + this(cluster, cluster.traitSetOf(Convention.NONE), ImmutableList.of(), table); + } + + /** Creates a LogicalTableScan by parsing serialized output. */ + public LogicalTableScan(RelInput input) { + super(input); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + assert traitSet.containsIfApplicable(Convention.NONE); + assert inputs.isEmpty(); + return this; + } + + // BEGIN FLINK MODIFICATION + // {@link #explainTerms} method should consider hints due to CALCITE-4581. + // This file should be remove once CALCITE-4581 is fixed. + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).itemIf("hints", getHints(), !getHints().isEmpty()); + } + // END FLINK MODIFICATION + + /** + * Creates a LogicalTableScan. + * + * @param cluster Cluster + * @param relOptTable Table + * @param hints The hints + */ + public static LogicalTableScan create( + RelOptCluster cluster, final RelOptTable relOptTable, List hints) { + final Table table = relOptTable.unwrap(Table.class); + final RelTraitSet traitSet = + cluster.traitSetOf(Convention.NONE) + .replaceIfs( + RelCollationTraitDef.INSTANCE, + () -> { + if (table != null) { + return table.getStatistic().getCollations(); + } + return ImmutableList.of(); + }); + return new LogicalTableScan(cluster, traitSet, hints, relOptTable); + } + + @Override + public RelNode withHints(List hintList) { + return new LogicalTableScan(getCluster(), traitSet, hintList, table); + } +}