From cacfef212793bce01fce928e38d07070ebd35009 Mon Sep 17 00:00:00 2001 From: godfreyhe Date: Sat, 23 Jan 2021 14:22:12 +0800 Subject: [PATCH] [minor][table-planner-blink] Minor code cleanup for some exec nodes and physical nodes This closes #14733 --- .../batch/BatchExecPythonGroupAggregate.java | 17 ++++++++--------- .../BatchExecPythonGroupWindowAggregate.java | 11 ++++++----- .../batch/BatchExecPythonOverAggregate.java | 15 ++++++++------- .../nodes/exec/common/CommonExecPythonCalc.java | 4 ++-- .../stream/StreamExecPythonGroupAggregate.java | 6 +++--- .../StreamExecPythonGroupTableAggregate.java | 6 +++--- .../StreamExecPythonGroupWindowAggregate.java | 5 +++-- .../stream/StreamExecPythonOverAggregate.java | 11 ++++++----- .../batch/BatchPhysicalNestedLoopJoin.scala | 10 ++-------- .../batch/BatchPhysicalPythonCalc.scala | 1 - .../batch/BatchPhysicalSortMergeJoin.scala | 16 +--------------- .../common/CommonPhysicalExchange.scala | 6 +++--- .../physical/stream/StreamPhysicalSort.scala | 2 -- 13 files changed, 45 insertions(+), 65 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java index 884e039da4a98..b8fc133e88e28 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java @@ -41,7 +41,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.Collections; -/** Batch [[ExecNode]] for Python unbounded group aggregate. */ +/** Batch {@link ExecNode} for Python unbounded group aggregate. */ public class BatchExecPythonGroupAggregate extends ExecNodeBase implements BatchExecNode { @@ -50,19 +50,19 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase + "BatchArrowPythonGroupAggregateFunctionOperator"; private final int[] grouping; - private final int[] groupingSet; + private final int[] auxGrouping; private final AggregateCall[] aggCalls; public BatchExecPythonGroupAggregate( int[] grouping, - int[] groupingSet, + int[] auxGrouping, AggregateCall[] aggCalls, ExecEdge inputEdge, RowType outputType, String description) { super(Collections.singletonList(inputEdge), outputType, description); this.grouping = grouping; - this.groupingSet = groupingSet; + this.auxGrouping = auxGrouping; this.aggCalls = aggCalls; } @@ -85,7 +85,6 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { return transform; } - @SuppressWarnings("unchecked") private OneInputTransformation createPythonOneInputTransformation( Transformation inputTransform, RowType inputRowType, @@ -102,7 +101,7 @@ private OneInputTransformation createPythonOneInputTransformat outputRowType, pythonUdafInputOffsets, pythonFunctionInfos); - return new OneInputTransformation( + return new OneInputTransformation<>( inputTransform, getDescription(), pythonOperator, @@ -117,10 +116,10 @@ private OneInputStreamOperator getPythonAggregateFunctionOpera RowType outputRowType, int[] udafInputOffsets, PythonFunctionInfo[] pythonFunctionInfos) { - final Class clazz = + final Class clazz = CommonPythonUtil.loadClass(ARROW_PYTHON_AGGREGATE_FUNCTION_OPERATOR_NAME); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, PythonFunctionInfo[].class, @@ -136,7 +135,7 @@ private OneInputStreamOperator getPythonAggregateFunctionOpera inputRowType, outputRowType, grouping, - groupingSet, + auxGrouping, udafInputOffsets); } catch (NoSuchMethodException | IllegalAccessException diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java index ad38470d12ebc..6f1179b0968df 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java @@ -51,7 +51,7 @@ import java.util.Arrays; import java.util.Collections; -/** Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +/** Batch {@link ExecNode} for group widow aggregate (Python user defined aggregate function). */ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase implements BatchExecNode { @@ -60,7 +60,7 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase + "BatchArrowPythonGroupWindowAggregateFunctionOperator"; private final int[] grouping; - private final int[] groupingSet; + private final int[] auxGrouping; private final AggregateCall[] aggCalls; private final LogicalWindow window; private final int inputTimeFieldIndex; @@ -68,7 +68,7 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase public BatchExecPythonGroupWindowAggregate( int[] grouping, - int[] groupingSet, + int[] auxGrouping, AggregateCall[] aggCalls, LogicalWindow window, int inputTimeFieldIndex, @@ -78,13 +78,14 @@ public BatchExecPythonGroupWindowAggregate( String description) { super(Collections.singletonList(inputEdge), outputType, description); this.grouping = grouping; - this.groupingSet = groupingSet; + this.auxGrouping = auxGrouping; this.aggCalls = aggCalls; this.window = window; this.inputTimeFieldIndex = inputTimeFieldIndex; this.namedWindowProperties = namedWindowProperties; } + @SuppressWarnings("unchecked") @Override protected Transformation translateToPlanInternal(PlannerBase planner) { final ExecNode inputNode = (ExecNode) getInputNodes().get(0); @@ -203,7 +204,7 @@ private OneInputStreamOperator getPythonGroupWindowAggregateFu slideSize, namePropertyTypeArray, grouping, - groupingSet, + auxGrouping, udafInputOffsets); } catch (NoSuchMethodException | InstantiationException diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java index 725b68dab90ed..2b4bace9a2e09 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java @@ -46,7 +46,8 @@ import java.util.List; /** - * Batch [[ExecNode]] for sort-based over window aggregate (Python user defined aggregate function). + * Batch {@link ExecNode} for sort-based over window aggregate (Python user defined aggregate + * function). */ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase { @@ -54,10 +55,10 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase { "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." + "BatchArrowPythonOverWindowAggregateFunctionOperator"; - private List lowerBoundary; - private List upperBoundary; - private List aggCalls; - private List aggWindowIndex; + private final List lowerBoundary; + private final List upperBoundary; + private final List aggCalls; + private final List aggWindowIndex; public BatchExecPythonOverAggregate( OverSpec over, ExecEdge inputEdge, RowType outputType, String description) { @@ -179,11 +180,11 @@ private OneInputStreamOperator getPythonOverWindowAggregateFun boolean[] isRangeWindows, int[] udafInputOffsets, PythonFunctionInfo[] pythonFunctionInfos) { - Class clazz = + Class clazz = CommonPythonUtil.loadClass( ARROW_PYTHON_OVER_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, PythonFunctionInfo[].class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java index 6fe1d3074cc0a..e0ea58ce99166 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java @@ -188,7 +188,7 @@ private OneInputStreamOperator getPythonScalarFunctionOperator PythonFunctionInfo[] pythonFunctionInfos, int[] forwardedFields, boolean isArrow) { - Class clazz; + Class clazz; if (isArrow) { clazz = CommonPythonUtil.loadClass(ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME); } else { @@ -196,7 +196,7 @@ private OneInputStreamOperator getPythonScalarFunctionOperator } try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, PythonFunctionInfo[].class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java index 5d134b0df1970..b39bde6273e78 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java @@ -125,8 +125,8 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { inputCountIndex, countStarInserted); // partitioned aggregation - OneInputTransformation transform = - new OneInputTransformation( + OneInputTransformation transform = + new OneInputTransformation<>( inputTransform, getDescription(), operator, @@ -163,7 +163,7 @@ private OneInputStreamOperator getPythonAggregateFunctionOpera boolean countStarInserted) { Class clazz = CommonPythonUtil.loadClass(PYTHON_STREAM_AGGREAGTE_OPERATOR_NAME); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, RowType.class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java index f9be3e43bf2aa..5c6aa73194fd0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java @@ -51,7 +51,7 @@ import java.util.Arrays; import java.util.Collections; -/** Stream [[ExecNode]] for unbounded python group table aggregate. */ +/** Stream {@link ExecNode} for unbounded python group table aggregate. */ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase implements StreamExecNode { private static final Logger LOG = @@ -162,9 +162,9 @@ private OneInputStreamOperator getPythonTableAggregateFunction long maxIdleStateRetentionTime, boolean generateUpdateBefore, int indexOfCountStar) { - Class clazz = CommonPythonUtil.loadClass(PYTHON_STREAM_TABLE_AGGREGATE_OPERATOR_NAME); + Class clazz = CommonPythonUtil.loadClass(PYTHON_STREAM_TABLE_AGGREGATE_OPERATOR_NAME); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, RowType.class, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java index 3e7a3826c50ef..0e3e3133a936f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java @@ -77,7 +77,7 @@ import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration; import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong; -/** Stream [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +/** Stream {@link ExecNode} for group widow aggregate (Python user defined aggregate function). */ public class StreamExecPythonGroupWindowAggregate extends ExecNodeBase implements StreamExecNode { private static final Logger LOGGER = @@ -110,6 +110,7 @@ public StreamExecPythonGroupWindowAggregate( this.emitStrategy = emitStrategy; } + @SuppressWarnings("unchecked") @Override protected Transformation translateToPlanInternal(PlannerBase planner) { final boolean isCountWindow; @@ -289,7 +290,7 @@ private Tuple2, Trigger> generateWindowAssignerAndTrigger() inputTransform.getParallelism()); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) private OneInputStreamOperator getPythonStreamGroupWindowAggregateFunctionOperator( Configuration config, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java index 7f71c49b133ed..4bdb66b85354f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java @@ -52,7 +52,7 @@ import java.math.BigDecimal; import java.util.Collections; -/** Stream [[ExecNode]] for python time-based over operator. */ +/** Stream {@link ExecNode} for python time-based over operator. */ public class StreamExecPythonOverAggregate extends ExecNodeBase implements StreamExecNode { private static final Logger LOG = LoggerFactory.getLogger(StreamExecPythonOverAggregate.class); @@ -82,6 +82,7 @@ public StreamExecPythonOverAggregate( this.overSpec = overSpec; } + @SuppressWarnings("unchecked") @Override protected Transformation translateToPlanInternal(PlannerBase planner) { if (overSpec.getGroups().size() > 1) { @@ -230,9 +231,9 @@ private OneInputStreamOperator getPythonOverWindowAggregateFun className = ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME; } - Class clazz = CommonPythonUtil.loadClass(className); + Class clazz = CommonPythonUtil.loadClass(className); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, long.class, @@ -272,9 +273,9 @@ private OneInputStreamOperator getPythonOverWindowAggregateFun className = ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME; } - Class clazz = CommonPythonUtil.loadClass(className); + Class clazz = CommonPythonUtil.loadClass(className); try { - Constructor ctor = + Constructor ctor = clazz.getConstructor( Configuration.class, PythonFunctionInfo[].class, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalNestedLoopJoin.scala index 0649878adbfc5..ccb526fb40e11 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalNestedLoopJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalNestedLoopJoin.scala @@ -20,10 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.utils.JoinTypeUtil -import org.apache.flink.table.runtime.typeutils.{BinaryRowDataSerializer} +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer import org.apache.calcite.plan._ import org.apache.calcite.rel.core._ @@ -31,10 +31,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexNode -import java.util - -import scala.collection.JavaConversions._ - /** * Batch physical RelNode for nested-loop [[Join]]. */ @@ -115,8 +111,6 @@ class BatchPhysicalNestedLoopJoin( satisfyTraitsOnBroadcastJoin(requiredTraitSet, leftIsBuild) } - //~ ExecNode methods ----------------------------------------------------------- - override def translateToExecNode(): ExecNode[_] = { val (leftEdge, rightEdge) = getInputEdges new BatchExecNestedLoopJoin( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala index 5d7d72c1db030..22f717ec45e79 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCalc.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCalc -import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortMergeJoin.scala index 6228fe927cc54..d840291aac201 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortMergeJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSortMergeJoin.scala @@ -21,8 +21,8 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory} -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortMergeJoin +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, FlinkRelOptUtil, JoinTypeUtil, JoinUtil} import org.apache.flink.table.runtime.operators.join.FlinkJoinType @@ -32,8 +32,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelCollationTraitDef, RelNode, RelWriter} import org.apache.calcite.rex.RexNode -import java.util - import scala.collection.JavaConversions._ /** @@ -168,19 +166,7 @@ class BatchPhysicalSortMergeJoin( Some(copy(newProvidedTraitSet, Seq(newLeft, newRight))) } - //~ ExecNode methods ----------------------------------------------------------- - - // this method must be in sync with the behavior of SortMergeJoinOperator. - def getInputEdges: util.List[ExecEdge] = List( - ExecEdge.builder() - .damBehavior(ExecEdge.DamBehavior.END_INPUT) - .build(), - ExecEdge.builder() - .damBehavior(ExecEdge.DamBehavior.END_INPUT) - .build()) - override def translateToExecNode(): ExecNode[_] = { - JoinUtil.validateJoinSpec( joinSpec, FlinkTypeFactory.toLogicalRowType(left.getRowType), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalExchange.scala index 6245bf64e1536..76e4719f0fd35 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalExchange.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalExchange.scala @@ -39,10 +39,10 @@ import scala.collection.JavaConverters._ abstract class CommonPhysicalExchange( cluster: RelOptCluster, traitSet: RelTraitSet, - relNode: RelNode, + inputRel: RelNode, relDistribution: RelDistribution) - extends Exchange(cluster, traitSet, relNode, relDistribution) - with FlinkPhysicalRel { + extends Exchange(cluster, traitSet, inputRel, relDistribution) + with FlinkPhysicalRel { override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { val inputRows = mq.getRowCount(input) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala index e86f476d997a3..f073f91d2492d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSort.scala @@ -69,8 +69,6 @@ class StreamPhysicalSort( .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) } - //~ ExecNode methods ----------------------------------------------------------- - override def translateToExecNode(): ExecNode[_] = { new StreamExecSort( SortUtil.getSortSpec(sortCollation.getFieldCollations),