Skip to content

Commit

Permalink
[FLINK-1656] Filter ForwardedField properties for group-at-a-time ope…
Browse files Browse the repository at this point in the history
…rators in Optimizer.

This closes apache#525
  • Loading branch information
fhueske committed Apr 3, 2015
1 parent b8bb762 commit dda8565
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 14 deletions.
8 changes: 5 additions & 3 deletions docs/programming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2303,7 +2303,7 @@ env.execute()
Semantic Annotations
-----------

Semantic annotations can be used give Flink hints about the behavior of a function.
Semantic annotations can be used to give Flink hints about the behavior of a function.
They tell the system which fields of a function's input the function reads and evaluates and
which fields it unmodified forwards from its input to its output.
Semantic annotations are a powerful means to speed up execution, because they
Expand All @@ -2325,11 +2325,12 @@ The following semantic annotations are currently supported.
Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output.
This information is used by the optimizer to infer whether a data property such as sorting or
partitioning is preserved by a function.
For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function's input group.

Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
Fields that are forwarded to the same position in the output can be specified by their position.
The specified position must be valid for the input and output data type and have the same type.
For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple.
For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple.

Fields which are unmodified forwarded to another position in the output are declared by specifying the
source field in the input and the target field in the output as field expressions.
Expand Down Expand Up @@ -2389,7 +2390,8 @@ class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{

Non-forwarded fields information declares all fields which are not preserved on the same position in a function's output.
The values of all other fields are considered to be preserved at the same position in the output.
Hence, non-forwarded fields information is inverse to forwarded fields information.
Hence, non-forwarded fields information is inverse to forwarded fields information.
Non-forwarded field information for group-wise operators such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition` must fulfill the same requirements as for forwarded field information.

**IMPORTANT**: The specification of non-forwarded fields information is optional. However if used,
**ALL!** non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public GroupReduceOperator<IN, OUT> setCombinable(boolean combinable) {

return this;
}

// --------------------------------------------------------------------------------------------
// Translation
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import java.util.List;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.CoGroupDescriptor;
import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
import org.apache.flink.api.common.operators.util.FieldSet;

/**
* The Optimizer representation of a <i>CoGroup</i> operator.
Expand Down Expand Up @@ -76,6 +80,41 @@ public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(op);
}

@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

// Local properties for CoGroup may only be preserved on key fields.
DualInputSemanticProperties origProps = ((DualInputOperator<?, ?, ?, ?>) getOperator()).getSemanticProperties();

DualInputSemanticProperties filteredProps = new DualInputSemanticProperties();
FieldSet readSet1 = origProps.getReadFields(0);
FieldSet readSet2 = origProps.getReadFields(1);
if(readSet1 != null) {
filteredProps.addReadFields(0, readSet1);
}
if(readSet2 != null) {
filteredProps.addReadFields(1, readSet2);
}

// preserve only key fields (first input)
for(int f : this.keys1) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for(int t : targets) {
filteredProps.addForwardedField(0, f, t);
}
}

// preserve only key fields (second input)
for(int f : this.keys2) {
FieldSet targets = origProps.getForwardingTargetFields(1, f);
for(int t : targets) {
filteredProps.addForwardedField(1, f, t);
}
}

return filteredProps;
}

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
// for CoGroup, we currently make no reasonable default estimates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.flink.optimizer.dag;

import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
import org.apache.flink.optimizer.operators.GroupCombineProperties;
Expand Down Expand Up @@ -88,6 +92,30 @@ protected List<OperatorDescriptorSingle> getPossibleProperties() {
return this.possibleProperties;
}

@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

// Local properties for GroupCombine may only be preserved on key fields.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}

// only add forward field information for key fields
if(this.keys != null) {
for (int f : this.keys) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for (int t : targets) {
filteredProps.addForwardedField(f, t);
}
}
}
return filteredProps;
}

// --------------------------------------------------------------------------------------------
// Estimates
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
Expand All @@ -32,6 +35,7 @@
import org.apache.flink.optimizer.operators.GroupReduceProperties;
import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.configuration.Configuration;

/**
Expand Down Expand Up @@ -135,6 +139,30 @@ public String getName() {
protected List<OperatorDescriptorSingle> getPossibleProperties() {
return this.possibleProperties;
}

@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

// Local properties for GroupReduce may only be preserved on key fields.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}

// only add forward field information for key fields
if(this.keys != null) {
for (int f : this.keys) {
FieldSet targets = origProps.getForwardingTargetFields(0, f);
for (int t : targets) {
filteredProps.addForwardedField(f, t);
}
}
}
return filteredProps;
}

// --------------------------------------------------------------------------------------------
// Estimates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;

/**
* The optimizer's internal representation of a <i>MapPartition</i> operator node.
Expand Down Expand Up @@ -55,6 +58,21 @@ protected List<OperatorDescriptorSingle> getPossibleProperties() {
return this.possibleProperties;
}

@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {

// Local properties for MapPartition may not be preserved.
SingleInputSemanticProperties origProps =
((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
filteredProps.addReadFields(readSet);
}

return filteredProps;
}

/**
* Computes the estimates for the MapPartition operator.
* We assume that by default, Map takes one value and transforms it into another value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ public List<DagConnection> getIncomingConnections() {
public SemanticProperties getSemanticProperties() {
return getOperator().getSemanticProperties();
}


protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
return this.getSemanticProperties();
}

protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
return this.getSemanticProperties();
}

@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
Expand Down Expand Up @@ -467,18 +474,17 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);

SemanticProperties props = this.getSemanticProperties();
// filter by the user code field copies
gProps = gProps.filterBySemanticProperties(props, 0);
lProps = lProps.filterBySemanticProperties(props, 0);
gProps = gProps.filterBySemanticProperties(getSemanticPropertiesForGlobalPropertyFiltering(), 0);
lProps = lProps.filterBySemanticProperties(getSemanticPropertiesForLocalPropertyFiltering(), 0);

// apply
node.initProperties(gProps, lProps);
node.updatePropertiesWithUniqueSets(getUniqueFields());
target.add(node);
}
}

// --------------------------------------------------------------------------------------------
// Branch Handling
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,18 @@ protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel
DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);

SemanticProperties props = this.getSemanticProperties();
GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
SemanticProperties semPropsGlobalPropFiltering = getSemanticPropertiesForGlobalPropertyFiltering();
GlobalProperties gp1 = in1.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone()
.filterBySemanticProperties(semPropsGlobalPropFiltering, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);

LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
SemanticProperties semPropsLocalPropFiltering = getSemanticPropertiesForLocalPropertyFiltering();
LocalProperties lp1 = in1.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 0);
LocalProperties lp2 = in2.getLocalProperties().clone()
.filterBySemanticProperties(semPropsLocalPropFiltering, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);

node.initProperties(combined, locals);
Expand Down Expand Up @@ -722,6 +727,14 @@ public void computeUnclosedBranchStack() {
public SemanticProperties getSemanticProperties() {
return getOperator().getSemanticProperties();
}

protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
return this.getSemanticProperties();
}

protected SemanticProperties getSemanticPropertiesForGlobalPropertyFiltering() {
return this.getSemanticProperties();
}

// --------------------------------------------------------------------------------------------
// Miscellaneous
Expand Down
Loading

0 comments on commit dda8565

Please sign in to comment.