Skip to content

Commit

Permalink
[FLINK-19282][table-planner] Support watermark push down in planner
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 authored Nov 7, 2020
1 parent 3367c23 commit 2c1e24b
Show file tree
Hide file tree
Showing 26 changed files with 2,211 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkRelFactories;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.utils.NestedColumn;
import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
import org.apache.flink.table.planner.plan.utils.NestedSchema;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
* Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. The transposed {@link LogicalProject}
* works like a pruner to prune the unused fields from source. The top level {@link LogicalProject} still has to do the
* calculation, filter and prune the rowtime column if the query doesn't need.
*
* <p>NOTES: Currently the rule doesn't support nested projection push down.
*/
public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {

public static final ProjectWatermarkAssignerTransposeRule INSTANCE = new ProjectWatermarkAssignerTransposeRule();

public ProjectWatermarkAssignerTransposeRule() {
super(operand(LogicalProject.class,
operand(LogicalWatermarkAssigner.class, any())),
FlinkRelFactories.FLINK_REL_BUILDER(),
"FlinkProjectWatermarkAssignerTransposeRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalWatermarkAssigner watermarkAssigner = call.rel(1);

NestedSchema usedFieldInProjectIncludingRowTimeFields = getUsedFieldsInTopLevelProjectAndWatermarkAssigner(project, watermarkAssigner);

// check the field number of the input in watermark assigner is equal to the used fields in watermark assigner and project

return usedFieldInProjectIncludingRowTimeFields.columns().size() != watermarkAssigner.getRowType().getFieldCount();
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalWatermarkAssigner watermarkAssigner = call.rel(1);

// NOTES: DON'T use the nestedSchema datatype to build the transposed project.
NestedSchema nestedSchema = getUsedFieldsInTopLevelProjectAndWatermarkAssigner(project, watermarkAssigner);
FlinkRelBuilder builder = (FlinkRelBuilder) call.builder().push(watermarkAssigner.getInput());
List<RexInputRef> transposedProjects = new LinkedList<>();
List<String> usedNames = new LinkedList<>();

// TODO: support nested projection push down in transpose
// add the used column RexInputRef and names into list
for (NestedColumn column: nestedSchema.columns().values()) {
// mark by hand
column.setIndexOfLeafInNewSchema(transposedProjects.size());
column.markLeaf();

usedNames.add(column.name());
transposedProjects.add(builder.field(column.indexInOriginSchema()));
}

// get the rowtime field index in the transposed project
String rowTimeName = watermarkAssigner.getRowType().getFieldNames().get(watermarkAssigner.rowtimeFieldIndex());
int indexOfRowTimeInTransposedProject;
if (nestedSchema.columns().get(rowTimeName) == null) {
// push the RexInputRef of the rowtime into the list
int rowTimeIndexInInput = watermarkAssigner.rowtimeFieldIndex();
indexOfRowTimeInTransposedProject = transposedProjects.size();
transposedProjects.add(builder.field(rowTimeIndexInInput));
usedNames.add(rowTimeName);
} else {
// find rowtime ref in the list and mark the location
indexOfRowTimeInTransposedProject = nestedSchema.columns().get(rowTimeName).indexOfLeafInNewSchema();
}

// the rowtime column has no rowtime indicator
builder.project(transposedProjects, usedNames);

// rewrite the top level field reference
RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
@Override
public RexNode visitInputRef(RexInputRef inputRef) {
String fieldName = watermarkAssigner.getRowType().getFieldNames().get(inputRef.getIndex());
return builder.field(nestedSchema.columns().get(fieldName).indexOfLeafInNewSchema());
}
});

builder.watermark(indexOfRowTimeInTransposedProject, newWatermarkExpr);

List<RexNode> newProjects = NestedProjectionUtil.rewrite(project.getProjects(), nestedSchema, call.builder().getRexBuilder());
RelNode newProject = builder.project(newProjects, project.getRowType().getFieldNames()).build();
call.transformTo(newProject);
}

private NestedSchema getUsedFieldsInTopLevelProjectAndWatermarkAssigner(LogicalProject project, LogicalWatermarkAssigner watermarkAssigner) {
List<RexNode> usedFields = new ArrayList<>(project.getProjects());
usedFields.add(watermarkAssigner.watermarkExpr());
return NestedProjectionUtil.build(usedFields, watermarkAssigner.getRowType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private static DataType applyPhysicalAndMetadataPushDown(
int newIndex = projectedPhysicalFields.length;
List<String> usedMetadataNames = new LinkedList<>();
for (NestedColumn metadata: usedMetaDataFields) {
metadata.setIndex(newIndex++);
metadata.setIndexOfLeafInNewSchema(newIndex++);
nestedSchema.columns().put(metadata.name(), metadata);
usedMetadataNames.add(metadataKeys.get(metadata.indexInOriginSchema() - physicalCount));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;

import java.util.List;
import java.util.stream.Collectors;

/**
* Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
* The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression
* and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}.
*/
public class PushWatermarkIntoTableSourceScanAcrossCalcRule extends PushWatermarkIntoTableSourceScanRuleBase {
public static final PushWatermarkIntoTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoTableSourceScanAcrossCalcRule();

public PushWatermarkIntoTableSourceScanAcrossCalcRule() {
super(operand(FlinkLogicalWatermarkAssigner.class,
operand(FlinkLogicalCalc.class,
operand(FlinkLogicalTableSourceScan.class, none()))),
"PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
FlinkLogicalTableSourceScan scan = call.rel(2);
return supportsWatermarkPushDown(scan);
}

@Override
public void onMatch(RelOptRuleCall call) {
FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
FlinkLogicalCalc calc = call.rel(1);

RexProgram originProgram = calc.getProgram();
List<RexNode> projectList = originProgram.getProjectList().stream()
.map(originProgram::expandLocalRef)
.collect(Collectors.toList());

//get watermark expression
RexNode rowTimeColumn = projectList.get(watermarkAssigner.rowtimeFieldIndex());
RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
@Override
public RexNode visitInputRef(RexInputRef inputRef) {
return projectList.get(inputRef.getIndex());
}
});

// push watermark assigner into the scan
FlinkLogicalTableSourceScan newScan =
getNewScan(
watermarkAssigner,
newWatermarkExpr,
call.rel(2),
((FlinkContext) call.getPlanner().getContext()).getTableConfig(),
false);

FlinkTypeFactory typeFactory = (FlinkTypeFactory) watermarkAssigner.getCluster().getTypeFactory();
RexBuilder builder = call.builder().getRexBuilder();
// cast timestamp type to rowtime type.
RexNode newRowTimeColumn = builder.makeReinterpretCast(
typeFactory.createRowtimeIndicatorType(rowTimeColumn.getType().isNullable()),
rowTimeColumn,
null);

// build new calc program
RexProgramBuilder programBuilder = new RexProgramBuilder(newScan.getRowType(), builder);

List<String> outputFieldNames = originProgram.getOutputRowType().getFieldNames();
for (int i = 0; i < projectList.size(); i++) {
if (i == watermarkAssigner.rowtimeFieldIndex()) {
// replace the origin computed column to keep type consistent
programBuilder.addProject(newRowTimeColumn, outputFieldNames.get(i));
} else {
programBuilder.addProject(projectList.get(i), outputFieldNames.get(i));
}
}
if (originProgram.getCondition() != null) {
programBuilder.addCondition(originProgram.expandLocalRef(originProgram.getCondition()));
}

FlinkLogicalCalc newCalc = FlinkLogicalCalc.create(newScan, programBuilder.getProgram());
call.transformTo(newCalc);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;

import org.apache.calcite.plan.RelOptRuleCall;


/**
* Rule to push the {@link FlinkLogicalWatermarkAssigner} into the {@link FlinkLogicalTableSourceScan}.
*/
public class PushWatermarkIntoTableSourceScanRule extends PushWatermarkIntoTableSourceScanRuleBase {
public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new PushWatermarkIntoTableSourceScanRule();

public PushWatermarkIntoTableSourceScanRule() {
super(operand(FlinkLogicalWatermarkAssigner.class,
operand(FlinkLogicalTableSourceScan.class, none())),
"PushWatermarkIntoTableSourceScanRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
FlinkLogicalTableSourceScan scan = call.rel(1);
return supportsWatermarkPushDown(scan);
}

@Override
public void onMatch(RelOptRuleCall call) {
FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
FlinkLogicalTableSourceScan scan = call.rel(1);
FlinkContext context = (FlinkContext) call.getPlanner().getContext();

FlinkLogicalTableSourceScan newScan =
getNewScan(watermarkAssigner, watermarkAssigner.watermarkExpr(), scan, context.getTableConfig(), true);

call.transformTo(newScan);
}
}
Loading

0 comments on commit 2c1e24b

Please sign in to comment.