Skip to content

Commit

Permalink
[FLINK-1328] [api-breaking][java-api][scala-api][optimizer] Reworked …
Browse files Browse the repository at this point in the history
…semantic annotations for functions.

- Renamed constantField annotations to forwardedFields annotation
- Forwarded fields can be defined for (nested) tuples, Pojos, case classes
- Added semantic function information to example programs

This closes apache#311
  • Loading branch information
fhueske committed Jan 28, 2015
1 parent 78f41e9 commit de8e066
Show file tree
Hide file tree
Showing 92 changed files with 6,689 additions and 3,817 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
}

// let the operator know that we preserve the key field
updates.withConstantSetFirst("0").withConstantSetSecond("0");
updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");

return iteration.closeWith(updates, updates);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public void testTranslationPlainEdges() {

// validate that the semantic properties are set as they should
TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));

TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();

Expand Down Expand Up @@ -179,8 +179,8 @@ public void testTranslationPlainEdgesWithForkedBroadcastVariable() {

// validate that the semantic properties are set as they should
TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField1(0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardedField2(0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));

TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
Expand Down Expand Up @@ -189,9 +188,7 @@ public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
@SuppressWarnings("unchecked")
CompositeType<T> cType = (CompositeType<T>) typeInfo;

List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
cType.getKey(field, 0, fieldDescriptors);

List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
int logicalKeyPosition = fieldDescriptors.get(0).getPosition();

if (cType instanceof PojoTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.functions.ReduceFunction;
Expand Down Expand Up @@ -138,8 +137,7 @@ public PojoSumAggregator(String field, TypeInformation<?> type) {
@SuppressWarnings("unchecked")
CompositeType<T> cType = (CompositeType<T>) type;

List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
cType.getKey(field, 0, fieldDescriptors);
List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);

int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.costs.CostEstimator;
import org.apache.flink.compiler.plan.PlanNode;
Expand Down Expand Up @@ -88,7 +89,7 @@ public List<PlanNode> getAlternativePlans(CostEstimator estimator) {

@Override
public SemanticProperties getSemanticProperties() {
return null;
return new EmptySemanticProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.util.List;
import java.util.Set;

import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.costs.CostEstimator;
Expand Down Expand Up @@ -255,9 +255,7 @@ protected void readStubAnnotations() {}

@Override
public SemanticProperties getSemanticProperties() {
DualInputSemanticProperties sprops = new DualInputSemanticProperties();
sprops.setAllFieldsConstant(true);
return sprops;
return new UnionSemanticProperties();
}

@Override
Expand All @@ -270,4 +268,35 @@ public void computeOutputEstimates(DataStatistics statistics) {
this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
}

public static class UnionSemanticProperties implements SemanticProperties {

@Override
public FieldSet getForwardingTargetFields(int input, int sourceField) {
if (input != 0 && input != 1) {
throw new IndexOutOfBoundsException("Invalid input index for binary union node.");
}

return new FieldSet(sourceField);
}

@Override
public int getForwardingSourceField(int input, int targetField) {
if (input != 0 && input != 1) {
throw new IndexOutOfBoundsException();
}

return targetField;
}

@Override
public FieldSet getReadFields(int input) {
if (input != 0 && input != 1) {
throw new IndexOutOfBoundsException();
}

return FieldSet.EMPTY_SET;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;

import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.compiler.CompilerException;
Expand Down Expand Up @@ -186,7 +187,7 @@ public String getName() {

@Override
public SemanticProperties getSemanticProperties() {
return null;
return new EmptySemanticProperties();
}

protected void readStubAnnotations() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.costs.CostEstimator;
Expand Down Expand Up @@ -234,7 +235,7 @@ public List<PlanNode> getAlternativePlans(CostEstimator estimator) {

@Override
public SemanticProperties getSemanticProperties() {
return null;
return new EmptySemanticProperties();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.costs.CostEstimator;
Expand Down Expand Up @@ -195,7 +196,7 @@ public List<PlanNode> getAlternativePlans(CostEstimator estimator) {

@Override
public SemanticProperties getSemanticProperties() {
return null;
return new EmptySemanticProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ public String getName() {

@Override
public SemanticProperties getSemanticProperties() {

SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
sprops.setAllFieldsConstant(true);

return sprops;
return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,35 +666,7 @@ protected void readUniqueFieldsAnnotation() {
// ------------------------------------------------------------------------
// Access of stub annotations
// ------------------------------------------------------------------------

/**
* Returns the key columns for the specific input, if all keys are preserved
* by this node. Null, otherwise.
*/
protected int[] getConstantKeySet(int input) {
Operator<?> contract = getPactContract();
if (contract instanceof AbstractUdfOperator<?, ?>) {
AbstractUdfOperator<?, ?> abstractPact = (AbstractUdfOperator<?, ?>) contract;
int[] keyColumns = abstractPact.getKeyColumns(input);
if (keyColumns != null) {
if (keyColumns.length == 0) {
return null;
}
for (int keyColumn : keyColumns) {
FieldSet fs = getSemanticProperties() == null ? null : getSemanticProperties().getForwardFields(input, keyColumn);

if (fs == null) {
return null;
} else if (!fs.contains(keyColumn)) {
return null;
}
}
return keyColumns;
}
}
return null;
}

/**
* An optional method where nodes can describe which fields will be unique in their output.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
import org.apache.flink.api.common.operators.util.FieldSet;
Expand Down Expand Up @@ -74,9 +76,8 @@ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics
}

@Override
public boolean isFieldConstant(int input, int fieldNumber) {
// Partition does not change any data
return true;
public SemanticProperties getSemanticProperties() {
return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public String getName() {

@Override
public SemanticProperties getSemanticProperties() {
SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
sprops.setAllFieldsConstant(true);
return sprops;
return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;

import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
Expand Down Expand Up @@ -222,7 +223,7 @@ public String getName() {

@Override
public SemanticProperties getSemanticProperties() {
return null;
return new EmptySemanticProperties();
}

protected void readStubAnnotations() {}
Expand Down
Loading

0 comments on commit de8e066

Please sign in to comment.