Skip to content

Commit

Permalink
[FLINK-26843][table-planner] Use ShortcutUtils.unwrap[TableConfig/Con…
Browse files Browse the repository at this point in the history
…text]

Cleanup code by using the utilitty methods of `ShortcutUtils`:
`unwrapTableConfig` and `unwrapContext`.
  • Loading branch information
matriv authored and twalthr committed Mar 24, 2022
1 parent 33ce844 commit 075b790
Show file tree
Hide file tree
Showing 42 changed files with 157 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.stream.Collectors;

import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;

/** Flink-specific {@link RelBuilder}. */
@Internal
Expand All @@ -88,7 +89,7 @@ private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema rel
super(context, cluster, relOptSchema);

this.toRelNodeConverter =
new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode());
new QueryOperationConverter(this, unwrapContext(context).isBatchMode());
this.expandFactory =
Util.first(
context.unwrap(ExpandFactory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.table.planner.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.FunctionDefinition;
Expand All @@ -31,6 +31,7 @@
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
Expand Down Expand Up @@ -84,14 +85,22 @@ public static FlinkContext unwrapContext(RelOptPlanner planner) {
return unwrapContext(planner.getContext());
}

public static FlinkContext unwrapContext(RelOptRuleCall relOptRuleCall) {
return unwrapContext(relOptRuleCall.getPlanner());
}

public static FlinkContext unwrapContext(Context context) {
return context.unwrap(FlinkContext.class);
}

public static ReadableConfig unwrapTableConfig(RelNode relNode) {
public static TableConfig unwrapTableConfig(RelNode relNode) {
return unwrapContext(relNode).getTableConfig();
}

public static TableConfig unwrapTableConfig(RelOptRuleCall relOptRuleCall) {
return unwrapContext(relOptRuleCall.getPlanner()).getTableConfig();
}

public static @Nullable FunctionDefinition unwrapFunctionDefinition(
ResolvedExpression expression) {
// Table API expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.flink.table.planner.plan.metadata

import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ColumnNullCount
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, FlinkRexUtil}
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.planner.{JDouble, JList}
import org.apache.flink.util.Preconditions

Expand Down Expand Up @@ -157,7 +158,7 @@ class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount]
} else {
// If predicate has $index is not null, null count of index is must be 0 after predicate.
val rexBuilder = rel.getCluster.getRexBuilder
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
val tableConfig = unwrapTableConfig(rel)
val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, predicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch._
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, FlinkRelOptUtil, FlinkRexUtil, RankUtil}
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, FlinkRexUtil, RankUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.planner.{JArrayList, JDouble}

import org.apache.calcite.plan.RelOptUtil
Expand Down Expand Up @@ -203,7 +204,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
null
} else {
val rexBuilder = rel.getCluster.getRexBuilder
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
val tableConfig = unwrapTableConfig(rel)
val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, predicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.flink.table.planner.JHashMap
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.FlinkDistribution
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.rel._
import org.apache.calcite.rel.core.{Calc, Sort, TableScan}
Expand Down Expand Up @@ -71,7 +71,7 @@ class FlinkRelMdDistribution private extends MetadataHandler[FlinkDistribution]
}

def flinkDistribution(sort: Sort, mq: RelMetadataQuery): FlinkRelDistribution = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(sort)
val tableConfig = unwrapTableConfig(sort)
val enableRangeSort = tableConfig.get(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED)
if ((sort.getCollation.getFieldCollations.nonEmpty &&
sort.fetch == null && sort.offset == null) && enableRangeSort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.annotation.Experimental
import org.apache.flink.configuration.ConfigOption
import org.apache.flink.configuration.ConfigOptions.key
import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow}
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
import org.apache.flink.table.planner.plan.nodes.physical.batch._
import org.apache.flink.table.planner.plan.stats.ValueInterval
import org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasTimeIntervalType, toLong}
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, SortUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.adapter.enumerable.EnumerableLimit
import org.apache.calcite.plan.volcano.RelSubset
Expand Down Expand Up @@ -174,9 +174,9 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
ndvOfGroupKeysOnGlobalAgg
} else {
val inputRowCnt = mq.getRowCount(input)
val config = rel.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val tableConfig = unwrapTableConfig(rel)
val parallelism = (inputRowCnt /
config.get(FlinkRelMdRowCount.TABLE_OPTIMIZER_ROWS_PER_LOCALAGG) + 1).toInt
tableConfig.get(FlinkRelMdRowCount.TABLE_OPTIMIZER_ROWS_PER_LOCALAGG) + 1).toInt
if (parallelism == 1) {
ndvOfGroupKeysOnGlobalAgg
} else if (grouping.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.flink.table.planner.plan.metadata

import org.apache.flink.table.planner.plan.metadata.SelectivityEstimator._
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, FlinkRexUtil}
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.planner.{JArrayList, JDouble, JHashMap, JHashSet}

import org.apache.calcite.avatica.util.DateTimeUtils
Expand Down Expand Up @@ -56,7 +57,7 @@ class SelectivityEstimator(rel: RelNode, mq: FlinkRelMetadataQuery)
extends RexVisitorImpl[Option[Double]](true) {

private val rexBuilder = rel.getCluster.getRexBuilder
private val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
private val tableConfig = unwrapTableConfig(rel)
private val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)

// these default values is referred to RelMdUtil#guessSelectivity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT
import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED
import org.apache.flink.table.planner.plan.utils.SortUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan._
import org.apache.calcite.rel.convert.ConverterRule
Expand Down Expand Up @@ -104,9 +104,9 @@ class FlinkLogicalSortBatchConverter extends ConverterRule(
override def convert(rel: RelNode): RelNode = {
val sort = rel.asInstanceOf[LogicalSort]
val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
val config = sort.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val enableRangeSort = config.get(TABLE_EXEC_RANGE_SORT_ENABLED)
val limitValue = config.get(TABLE_EXEC_SORT_DEFAULT_LIMIT)
val tableConfig = unwrapTableConfig(sort)
val enableRangeSort = tableConfig.get(TABLE_EXEC_RANGE_SORT_ENABLED)
val limitValue = tableConfig.get(TABLE_EXEC_SORT_DEFAULT_LIMIT)
val (offset, fetch) = if (sort.fetch == null && sort.offset == null
&& !enableRangeSort && limitValue > 0) {
//force the sort add limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch

import org.apache.flink.table.api.TableException
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
Expand Down Expand Up @@ -68,7 +68,7 @@ abstract class BatchPhysicalGroupAggregateBase(
def getAggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)] = aggCallToAggFunction

protected def isEnforceTwoStageAgg: Boolean = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = unwrapTableConfig(this)
getAggPhaseStrategy(tableConfig) == AggregatePhaseStrategy.TWO_PHASE
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
Expand Down Expand Up @@ -109,7 +110,7 @@ class BatchPhysicalHashAggregate(
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = unwrapTableConfig(this)
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkR
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalJoinRuleBase
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil.splitOutOffsetOrInsensitiveGroup
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, OverAggregateUtil, RelExplainUtil}
import org.apache.flink.table.planner.plan.utils.{OverAggregateUtil, RelExplainUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelDistribution.Type._
Expand Down Expand Up @@ -136,7 +137,7 @@ abstract class BatchPhysicalOverAggregateBase(
} else {
val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ < inputFieldCnt)
if (isAllFieldsFromInput) {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = unwrapTableConfig(this)
if (tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)) {
ImmutableIntList.of(partitionKeyIndices: _*).containsAll(requiredDistribution.getKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupAggregate
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils

import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
Expand Down Expand Up @@ -92,7 +93,7 @@ class BatchPhysicalPythonGroupAggregate(
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = ShortcutUtils.unwrapTableConfig(this)
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.cost.{FlinkCost, FlinkCostFactory}
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecRank
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange, RankType}

import org.apache.calcite.plan._
Expand Down Expand Up @@ -134,7 +135,7 @@ class BatchPhysicalRank(
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = unwrapTableConfig(this)
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && partitionKeyList.containsAll(shuffleKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalJoinRuleBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig

import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.RelDistribution.Type.{HASH_DISTRIBUTED, SINGLETON}
Expand Down Expand Up @@ -110,7 +111,7 @@ class BatchPhysicalSortAggregate(
true
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val tableConfig = unwrapTableConfig(this)
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWin
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedWindowAggregateRule
import org.apache.flink.table.planner.plan.utils.WindowUtil.checkEmitConfiguration
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, FlinkRelOptUtil, RelExplainUtil, WindowUtil}
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, RelExplainUtil, WindowUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
Expand Down Expand Up @@ -126,7 +127,7 @@ class StreamPhysicalGlobalWindowAggregate(
}

override def translateToExecNode(): ExecNode[_] = {
checkEmitConfiguration(FlinkRelOptUtil.getTableConfigFromContext(this))
checkEmitConfiguration(unwrapTableConfig(this))
new StreamExecGlobalWindowAggregate(
grouping,
aggCalls.toArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWind
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedWindowAggregateRule
import org.apache.flink.table.planner.plan.utils.WindowUtil.checkEmitConfiguration
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, FlinkRelOptUtil, RelExplainUtil, WindowUtil}
import org.apache.flink.table.planner.plan.utils.{AggregateUtil, RelExplainUtil, WindowUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.runtime.groupwindow.{NamedWindowProperty, SliceEnd, WindowReference}

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
Expand Down Expand Up @@ -123,7 +124,7 @@ class StreamPhysicalLocalWindowAggregate(
}

override def translateToExecNode(): ExecNode[_] = {
checkEmitConfiguration(FlinkRelOptUtil.getTableConfigFromContext(this))
checkEmitConfiguration(unwrapTableConfig(this))
new StreamExecLocalWindowAggregate(
grouping,
aggCalls.toArray,
Expand Down
Loading

0 comments on commit 075b790

Please sign in to comment.