Skip to content

Commit

Permalink
[FLINK-20857][table-planner-blink] Introduce BatchPhysicalPythonGroup…
Browse files Browse the repository at this point in the history
…WindowAggregate, and make BatchExecPythonGroupWindowAggregate only extended from ExecNode

This closes apache#14574
  • Loading branch information
godfreyhe committed Jan 9, 2021
1 parent 36352a3 commit 8184934
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWindowAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalPythonGroupWindowAggregate;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
Expand All @@ -56,17 +56,17 @@

/**
* The physical rule is responsible for convert {@link FlinkLogicalWindowAggregate} to {@link
* BatchExecPythonGroupWindowAggregate}.
* BatchPhysicalPythonGroupWindowAggregate}.
*/
public class BatchExecPythonWindowAggregateRule extends RelOptRule {
public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule {

public static final RelOptRule INSTANCE = new BatchExecPythonWindowAggregateRule();
public static final RelOptRule INSTANCE = new BatchPhysicalPythonWindowAggregateRule();

private BatchExecPythonWindowAggregateRule() {
private BatchPhysicalPythonWindowAggregateRule() {
super(
operand(FlinkLogicalWindowAggregate.class, operand(RelNode.class, any())),
FlinkRelFactories.LOGICAL_BUILDER_WITHOUT_AGG_INPUT_PRUNE(),
"BatchExecPythonWindowAggregateRule");
"BatchPhysicalPythonWindowAggregateRule");
}

@Override
Expand Down Expand Up @@ -146,8 +146,8 @@ public void onMatch(RelOptRuleCall call) {
requiredTraitSet = requiredTraitSet.replace(sortCollation);

RelNode newInput = RelOptRule.convert(input, requiredTraitSet);
BatchExecPythonGroupWindowAggregate windowAgg =
new BatchExecPythonGroupWindowAggregate(
BatchPhysicalPythonGroupWindowAggregate windowAgg =
new BatchPhysicalPythonGroupWindowAggregate(
agg.getCluster(),
traitSet,
newInput,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.physical.batch
package org.apache.flink.table.planner.plan.nodes.exec.batch

import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.Configuration
Expand All @@ -25,115 +25,55 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.functions.python.PythonFunctionInfo
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator
import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart}
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, LegacyBatchExecNode}
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, ExecNodeBase}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.types.logical.RowType

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery

import java.util

import scala.collection.JavaConversions._
import java.util.Collections

/**
* Batch physical RelNode for group widow aggregate (Python user defined aggregate function).
*/
* Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function).
*
* <p>Note: This class can't be ported to Java,
* because java class can't extend scala interface with default implementation.
* FLINK-20858 will port this class to Java.
*/
class BatchExecPythonGroupWindowAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
outputRowType: RelDataType,
inputRowType: RelDataType,
grouping: Array[Int],
auxGrouping: Array[Int],
aggCalls: Seq[AggregateCall],
aggFunctions: Array[UserDefinedFunction],
aggCalls: Array[AggregateCall],
window: LogicalWindow,
inputTimeFieldIndex: Int,
inputTimeIsDate: Boolean,
namedWindowProperties: Seq[PlannerNamedWindowProperty])
extends BatchPhysicalWindowAggregateBase(
cluster,
traitSet,
inputRel,
outputRowType,
grouping,
auxGrouping,
aggCalls.zip(aggFunctions),
window,
namedWindowProperties,
false,
false,
true)
with LegacyBatchExecNode[RowData]
namedWindowProperties: Array[PlannerNamedWindowProperty],
inputEdge: ExecEdge,
outputType: RowType,
description: String)
extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description)
with BatchExecNode[RowData]
with CommonExecPythonAggregate {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchExecPythonGroupWindowAggregate(
cluster,
traitSet,
inputs.get(0),
outputRowType,
inputRowType,
grouping,
auxGrouping,
aggCalls,
aggFunctions,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val inputRowCnt = mq.getRowCount(getInput)
if (inputRowCnt == null) {
return null
}
// does not take pane optimization into consideration here
// sort is not done here
val aggCallToAggFunction = aggCalls.zip(aggFunctions)
val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
val averageRowSize: Double = mq.getAverageRowSize(this)
val memCost = averageRowSize
val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
}

//~ ExecNode methods -----------------------------------------------------------

override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)

override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[RowData] = {
val input = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val outputType = FlinkTypeFactory.toLogicalRowType(getRowType)
val inputType = FlinkTypeFactory.toLogicalRowType(inputRowType)
override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = {
val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]]
val inputTransform = inputNode.translateToPlan(planner)

val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window)

val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger(
ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT)

val ret = createPythonOneInputTransformation(
input,
inputType,
val transform = createPythonOneInputTransformation(
inputTransform,
inputNode.getOutputType.asInstanceOf[RowType],
outputType,
inputTimeFieldIndex,
groupBufferLimitSize,
Expand All @@ -142,9 +82,9 @@ class BatchExecPythonGroupWindowAggregate(
getConfig(planner.getExecEnv, planner.getTableConfig))

if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
}
ret
transform
}

private[this] def createPythonOneInputTransformation(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.physical.batch

import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.logical.LogicalWindow
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery

import java.util

/**
* Batch physical RelNode for group widow aggregate (Python user defined aggregate function).
*/
class BatchPhysicalPythonGroupWindowAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
outputRowType: RelDataType,
inputRowType: RelDataType,
grouping: Array[Int],
auxGrouping: Array[Int],
aggCalls: Seq[AggregateCall],
aggFunctions: Array[UserDefinedFunction],
window: LogicalWindow,
inputTimeFieldIndex: Int,
inputTimeIsDate: Boolean,
namedWindowProperties: Seq[PlannerNamedWindowProperty])
extends BatchPhysicalWindowAggregateBase(
cluster,
traitSet,
inputRel,
outputRowType,
grouping,
auxGrouping,
aggCalls.zip(aggFunctions),
window,
namedWindowProperties,
false,
false,
true) {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new BatchPhysicalPythonGroupWindowAggregate(
cluster,
traitSet,
inputs.get(0),
outputRowType,
inputRowType,
grouping,
auxGrouping,
aggCalls,
aggFunctions,
window,
inputTimeFieldIndex,
inputTimeIsDate,
namedWindowProperties)
}

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val inputRowCnt = mq.getRowCount(getInput)
if (inputRowCnt == null) {
return null
}
// does not take pane optimization into consideration here
// sort is not done here
val aggCallToAggFunction = aggCalls.zip(aggFunctions)
val cpu = FlinkCost.FUNC_CPU_COST * inputRowCnt * aggCallToAggFunction.size
val averageRowSize: Double = mq.getAverageRowSize(this)
val memCost = averageRowSize
val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
costFactory.makeCost(mq.getRowCount(this), cpu, 0, 0, memCost)
}

override def translateToExecNode(): ExecNode[_] = {
new BatchExecPythonGroupWindowAggregate(
grouping,
auxGrouping,
aggCalls.toArray,
window,
inputTimeFieldIndex,
namedWindowProperties.toArray,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ object FlinkBatchRuleSets {
BatchExecOverAggregateRule.INSTANCE,
// window agg
BatchPhysicalWindowAggregateRule.INSTANCE,
BatchExecPythonWindowAggregateRule.INSTANCE,
BatchPhysicalPythonWindowAggregateRule.INSTANCE,
// join
BatchExecHashJoinRule.INSTANCE,
BatchExecSortMergeJoinRule.INSTANCE,
Expand Down

0 comments on commit 8184934

Please sign in to comment.