Skip to content

Commit

Permalink
Perform TypeExtraction outside of Java Operators
Browse files Browse the repository at this point in the history
Before, FlatMapOperator, GroupReduceOperator, MapOperator, and MapPartitionOperator
performed the Type extraction themselves while the other Operators had TypeInformation
parameters. Now the are all unified, which makes it possible to use them from the
Scala API.

Also Key extraction for selector functions is moved outside of Keys.java
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent 57b8e66 commit 6bbe2a0
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 33 deletions.
23 changes: 16 additions & 7 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;



/**
Expand Down Expand Up @@ -145,7 +147,10 @@ public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
if (FunctionUtils.isLambdaFunction(mapper)) {
throw new UnsupportedLambdaExpressionException();
}
return new MapOperator<T, R>(this, mapper);

TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());

return new MapOperator<T, R>(this, resultType, mapper);
}


Expand All @@ -172,7 +177,8 @@ public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> ma
if (mapPartition == null) {
throw new NullPointerException("MapPartition function must not be null.");
}
return new MapPartitionOperator<T, R>(this, mapPartition);
TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
return new MapPartitionOperator<T, R>(this, resultType, mapPartition);
}

/**
Expand All @@ -194,7 +200,8 @@ public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
if (FunctionUtils.isLambdaFunction(flatMapper)) {
throw new UnsupportedLambdaExpressionException();
}
return new FlatMapOperator<T, R>(this, flatMapper);
TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
return new FlatMapOperator<T, R>(this, resultType, flatMapper);
}

/**
Expand Down Expand Up @@ -340,7 +347,8 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
return new GroupReduceOperator<T, R>(this, resultType, reducer);
}

/**
Expand Down Expand Up @@ -400,7 +408,8 @@ public ReduceOperator<T> maxBy(int... fields) {
* @return A DistinctOperator that represents the distinct DataSet.
*/
public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
}

/**
Expand Down Expand Up @@ -456,9 +465,9 @@ public DistinctOperator<T> distinct() {
* @see org.apache.flink.api.java.operators.GroupReduceOperator
* @see DataSet
*/

public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ public CoGroupOperatorSetsPredicate where(int... fields) {
* @see DataSet
*/
public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keyExtractor, input1.getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keyExtractor, input1.getType(), keyType));
}

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -431,7 +432,8 @@ public CoGroupOperatorWithoutFunction equalTo(int... fields) {
* Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation.
*/
public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType(), keyType));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.TypeInformation;

/**
* This operator represents the application of a "flatMap" function on a data set, and the
Expand All @@ -38,8 +38,8 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
protected final FlatMapFunction<IN, OUT> function;


public FlatMapOperator(DataSet<IN> input, FlatMapFunction<IN, OUT> function) {
super(input, TypeExtractor.getFlatMapReturnTypes(function, input.getType()));
public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
super(input, resultType);

this.function = function;
extractSemanticAnnotationsFromUdf(function.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.TypeInformation;

import org.apache.flink.api.java.DataSet;
Expand All @@ -58,8 +57,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
* @param input The input data set to the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
public GroupReduceOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) {
super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType()));
public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
super(input, resultType);

this.function = function;
this.grouper = null;
Expand All @@ -73,8 +72,8 @@ public GroupReduceOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> funct
* @param input The grouped input to be processed group-wise by the groupReduce function.
* @param function The user-defined GroupReduce function.
*/
public GroupReduceOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) {
super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType()));
public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
super(input != null ? input.getDataSet() : null, resultType);

this.function = function;
this.grouper = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,8 @@ public JoinOperatorSetsPredicate where(int... fields) {
* @see DataSet
*/
public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType(), keyType));
}

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -829,7 +830,8 @@ public DefaultJoin<I1, I2> equalTo(int... fields) {
* @return A DefaultJoin that represents the joined DataSet.
*/
public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType()));
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType(), keyType));
}

protected DefaultJoin<I1, I2> createJoinOperator(Keys<I2> keys2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,18 @@ public static class SelectorFunctionKeys<T, K> extends Keys<T> {

private final KeySelector<T, K> keyExtractor;
private final TypeInformation<K> keyType;
public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> type) {

public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) {
if (keyExtractor == null) {
throw new NullPointerException("Key extractor must not be null.");
}

this.keyExtractor = keyExtractor;
this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
this.keyType = keyType;

if (!this.keyType.isKeyType()) {
throw new IllegalArgumentException("Invalid type of KeySelector keys");
}

}

public TypeInformation<K> getKeyType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.TypeInformation;

/**
* This operator represents the application of a "map" function on a data set, and the
Expand All @@ -40,9 +40,9 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
protected final MapFunction<IN, OUT> function;


public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {

super(input, TypeExtractor.getMapReturnTypes(function, input.getType()));
super(input, resultType);

this.function = function;
extractSemanticAnnotationsFromUdf(function.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.TypeInformation;

/**
* This operator represents the application of a "mapPartition" function on a data set, and the
Expand All @@ -39,8 +39,8 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
protected final MapPartitionFunction<IN, OUT> function;


public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
super(input, resultType);

this.function = function;
extractSemanticAnnotationsFromUdf(function.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;


/**
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
Expand Down Expand Up @@ -85,7 +88,8 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
return new GroupReduceOperator<T, R>(this, resultType, reducer);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;

public class UnsortedGrouping<T> extends Grouping<T> {

Expand Down Expand Up @@ -133,7 +135,9 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
if (FunctionUtils.isLambdaFunction(reducer)) {
throw new UnsupportedLambdaExpressionException();
}
return new GroupReduceOperator<T, R>(this, reducer);
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());

return new GroupReduceOperator<T, R>(this, resultType, reducer);
}

/**
Expand Down

0 comments on commit 6bbe2a0

Please sign in to comment.