From 8e975362312c727fd602429778bc1c3628b95619 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 14 Jul 2017 10:42:57 +0200 Subject: [PATCH] [FLINK-7191] Activate checkstyle flink-java/operators/translation This closes #4334. --- .../CombineToGroupCombineWrapper.java | 1 + .../translation/KeyExtractingMapper.java | 23 +-- .../translation/KeyRemovingMapper.java | 9 +- .../PlanBothUnwrappingCoGroupOperator.java | 22 +-- .../translation/PlanFilterOperator.java | 17 +- .../PlanLeftUnwrappingCoGroupOperator.java | 15 +- .../translation/PlanProjectOperator.java | 25 +-- .../PlanRightUnwrappingCoGroupOperator.java | 11 +- .../PlanUnwrappingGroupCombineOperator.java | 29 ++-- .../PlanUnwrappingReduceGroupOperator.java | 47 +++-- .../PlanUnwrappingReduceOperator.java | 12 +- ...nUnwrappingSortedGroupCombineOperator.java | 15 +- ...anUnwrappingSortedReduceGroupOperator.java | 23 +-- .../RichCombineToGroupCombineWrapper.java | 3 +- .../translation/Tuple3UnwrappingIterator.java | 4 +- .../translation/Tuple3WrappingCollector.java | 3 +- .../TupleLeftUnwrappingJoiner.java | 8 + .../TupleRightUnwrappingJoiner.java | 8 + .../translation/TupleUnwrappingIterator.java | 14 +- .../translation/TupleUnwrappingJoiner.java | 8 + .../translation/TupleWrappingCollector.java | 13 +- .../translation/TwoKeyExtractingMapper.java | 10 +- .../translation/WrappingFunction.java | 13 +- .../translation/AggregateTranslationTest.java | 35 ++-- .../CoGroupSortTranslationTest.java | 53 +++--- .../DeltaIterationTranslationTest.java | 160 +++++++++--------- .../translation/DistinctTranslationTest.java | 15 +- .../translation/ReduceTranslationTests.java | 98 ++++++----- tools/maven/suppressions-java.xml | 8 - 29 files changed, 375 insertions(+), 327 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java index 408d4b3098a08..f574218c2e697 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.operators.translation; import org.apache.flink.annotation.Internal; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java index f35b9508222c6..1f0cc1d84c5a6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java @@ -19,34 +19,37 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; +/** + * Mapper that extracts keys. + * @param type of value + * @param type of key + */ @Internal @ForwardedFields("*->1") public final class KeyExtractingMapper extends RichMapFunction> { - + private static final long serialVersionUID = 1L; - + private final KeySelector keySelector; - + private final Tuple2 tuple = new Tuple2(); - - + public KeyExtractingMapper(KeySelector keySelector) { this.keySelector = keySelector; } - - + @Override public Tuple2 map(T value) throws Exception { - + K key = keySelector.getKey(value); tuple.f0 = key; tuple.f1 = value; - + return tuple; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java index 5f0de32e2dec0..920f893a85c2f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java @@ -23,12 +23,17 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; +/** + * Mapper that removes keys. + * @param type of values + * @param type of keys + */ @Internal @ForwardedFields("1->*") public final class KeyRemovingMapper extends RichMapFunction, T> { - + private static final long serialVersionUID = 1L; - + @Override public T map(Tuple2 value) { return value.f1; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java index 1814329d06979..6ccf0f3655029 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java @@ -21,16 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * A co group operator that applies the operation only on the unwrapped values. + */ @Internal public class PlanBothUnwrappingCoGroupOperator - extends CoGroupOperatorBase, Tuple2, OUT, CoGroupFunction, Tuple2, OUT>> -{ + extends CoGroupOperatorBase, Tuple2, OUT, CoGroupFunction, Tuple2, OUT>> { public PlanBothUnwrappingCoGroupOperator( CoGroupFunction udf, @@ -52,23 +54,21 @@ public PlanBothUnwrappingCoGroupOperator( name); } - public static final class TupleBothUnwrappingCoGrouper + private static final class TupleBothUnwrappingCoGrouper extends WrappingFunction> - implements CoGroupFunction, Tuple2, OUT> - { + implements CoGroupFunction, Tuple2, OUT> { private static final long serialVersionUID = 1L; - + private final TupleUnwrappingIterator iter1; private final TupleUnwrappingIterator iter2; - + private TupleBothUnwrappingCoGrouper(CoGroupFunction wrapped) { super(wrapped); - + this.iter1 = new TupleUnwrappingIterator(); this.iter2 = new TupleUnwrappingIterator(); } - @Override public void coGroup( Iterable> records1, @@ -79,6 +79,6 @@ public void coGroup( iter2.set(records2.iterator()); this.wrappedFunction.coGroup(iter1, iter2, out); } - + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java index c93191f3c87da..ecf1aac90205c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java @@ -27,26 +27,33 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.util.Collector; +/** + * @see FilterOperatorBase + * @param + */ @Internal @ForwardedFields("*") public class PlanFilterOperator extends FilterOperatorBase> { - + public PlanFilterOperator(FilterFunction udf, String name, TypeInformation type) { super(new FlatMapFilter(udf), new UnaryOperatorInformation(type, type), name); } + /** + * @see FlatMapFunction + * @param + */ public static final class FlatMapFilter extends WrappingFunction> - implements FlatMapFunction - { + implements FlatMapFunction { private static final long serialVersionUID = 1L; - + private FlatMapFilter(FilterFunction wrapped) { super(wrapped); } @Override - public final void flatMap(T value, Collector out) throws Exception { + public void flatMap(T value, Collector out) throws Exception { if (this.wrappedFunction.filter(value)) { out.collect(value); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java index 78840cec0b13a..b2a6937d9e900 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java @@ -21,16 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * A co group operator that applies the operation only on the unwrapped values on the left. + */ @Internal public class PlanLeftUnwrappingCoGroupOperator - extends CoGroupOperatorBase, I2, OUT, CoGroupFunction, I2, OUT>> -{ + extends CoGroupOperatorBase, I2, OUT, CoGroupFunction, I2, OUT>> { public PlanLeftUnwrappingCoGroupOperator( CoGroupFunction udf, @@ -52,21 +54,20 @@ public PlanLeftUnwrappingCoGroupOperator( name); } - public static final class TupleLeftUnwrappingCoGrouper + private static final class TupleLeftUnwrappingCoGrouper extends WrappingFunction> implements CoGroupFunction, I2, OUT> { private static final long serialVersionUID = 1L; - + private final TupleUnwrappingIterator iter1; private TupleLeftUnwrappingCoGrouper(CoGroupFunction wrapped) { super(wrapped); - + this.iter1 = new TupleUnwrappingIterator(); } - @Override public void coGroup( Iterable> records1, diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java index fe981a5fb3e2b..3960807adc13f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java @@ -27,30 +27,33 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; +/** + * A map operator that retains a subset of fields from incoming tuples. + * + * @param Input tuple type + * @param Output tuple type + */ @Internal public class PlanProjectOperator extends MapOperatorBase> { public PlanProjectOperator(int[] fields, String name, TypeInformation inType, TypeInformation outType, - ExecutionConfig executionConfig) - { + ExecutionConfig executionConfig) { super(PlanProjectOperator.createTypedProjector(fields), new UnaryOperatorInformation(inType, outType), name); } - + @SuppressWarnings("unchecked") private static MapFunction createTypedProjector(int[] fields) { return (MapFunction) new MapProjector(fields); } - - - public static final class MapProjector - extends AbstractRichFunction implements MapFunction - { + + private static final class MapProjector + extends AbstractRichFunction implements MapFunction { private static final long serialVersionUID = 1L; - + private final int[] fields; private final Tuple outTuple; - + private MapProjector(int[] fields) { this.fields = fields; try { @@ -69,7 +72,7 @@ public R map(Tuple inTuple) throws Exception { for (int i = 0; i < fields.length; i++) { outTuple.setField(inTuple.getField(fields[i]), i); } - + return (R) outTuple; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java index faeca4e3dda07..f86deb77700f2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java @@ -21,16 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * A co group operator that applies the operation only on the unwrapped values on the right. + */ @Internal public class PlanRightUnwrappingCoGroupOperator - extends CoGroupOperatorBase, OUT, CoGroupFunction, OUT>> -{ + extends CoGroupOperatorBase, OUT, CoGroupFunction, OUT>> { public PlanRightUnwrappingCoGroupOperator( CoGroupFunction udf, @@ -52,7 +54,7 @@ public PlanRightUnwrappingCoGroupOperator( name); } - public static final class TupleRightUnwrappingCoGrouper + private static final class TupleRightUnwrappingCoGrouper extends WrappingFunction> implements CoGroupFunction, OUT> { @@ -66,7 +68,6 @@ private TupleRightUnwrappingCoGrouper(CoGroupFunction wrapped) { this.iter2 = new TupleUnwrappingIterator(); } - @Override public void coGroup( Iterable records1, diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java index e9feb61503f66..6e6f226a5e82d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java @@ -20,10 +20,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -35,35 +35,32 @@ public class PlanUnwrappingGroupCombineOperator extends GroupCombineOperatorBase, OUT, GroupCombineFunction, OUT>> { public PlanUnwrappingGroupCombineOperator(GroupCombineFunction udf, Keys.SelectorFunctionKeys key, String name, - TypeInformation outType, TypeInformation> typeInfoWithKey) - { + TypeInformation outType, TypeInformation> typeInfoWithKey) { super(new TupleUnwrappingGroupCombiner(udf), new UnaryOperatorInformation, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); - + } - + // -------------------------------------------------------------------------------------------- - - public static final class TupleUnwrappingGroupCombiner extends WrappingFunction> - implements GroupCombineFunction, OUT> - { - + + private static final class TupleUnwrappingGroupCombiner extends WrappingFunction> + implements GroupCombineFunction, OUT> { + private static final long serialVersionUID = 1L; - - private final TupleUnwrappingIterator iter; - + + private final TupleUnwrappingIterator iter; + private TupleUnwrappingGroupCombiner(GroupCombineFunction wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator(); } - - + @Override public void combine(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); this.wrappedFunction.combine(iter, out); } - + @Override public String toString() { return this.wrappedFunction.toString(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 85686599315ef..33c527d0e2eb9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -21,10 +21,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -33,7 +33,7 @@ * on the unwrapped values. */ @Internal -public class PlanUnwrappingReduceGroupOperator extends GroupReduceOperatorBase, OUT, GroupReduceFunction,OUT>> { +public class PlanUnwrappingReduceGroupOperator extends GroupReduceOperatorBase, OUT, GroupReduceFunction, OUT>> { public PlanUnwrappingReduceGroupOperator( GroupReduceFunction udf, @@ -41,32 +41,30 @@ public PlanUnwrappingReduceGroupOperator( String name, TypeInformation outType, TypeInformation> typeInfoWithKey, - boolean combinable) - { + boolean combinable) { super( combinable ? new TupleUnwrappingGroupCombinableGroupReducer(udf) : new TupleUnwrappingNonCombinableGroupReducer(udf), new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); - + super.setCombinable(combinable); } - + // -------------------------------------------------------------------------------------------- - - public static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple2> - { + + private static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple2> { private static final long serialVersionUID = 1L; - + private TupleUnwrappingIterator iter; private TupleWrappingCollector coll; private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction wrapped) { super(wrapped); - if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { + if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface."); } @@ -74,7 +72,6 @@ private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction this.coll = new TupleWrappingCollector<>(this.iter); } - @Override public void reduce(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); @@ -87,35 +84,33 @@ public void combine(Iterable> values, Collector> out iter.set(values.iterator()); coll.set(out); - ((GroupCombineFunction)this.wrappedFunction).combine(iter, coll); + ((GroupCombineFunction) this.wrappedFunction).combine(iter, coll); } - + @Override public String toString() { return this.wrappedFunction.toString(); } } - - public static final class TupleUnwrappingNonCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT> - { - + + private static final class TupleUnwrappingNonCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT> { + private static final long serialVersionUID = 1L; - - private final TupleUnwrappingIterator iter; - + + private final TupleUnwrappingIterator iter; + private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator<>(); } - - + @Override public void reduce(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); this.wrappedFunction.reduce(iter, out); } - + @Override public String toString() { return this.wrappedFunction.toString(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java index 72dc41a8de409..b2e614e30cd73 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java @@ -20,13 +20,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; - /** * A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only * on the unwrapped values. @@ -35,16 +34,13 @@ public class PlanUnwrappingReduceOperator extends ReduceOperatorBase, ReduceFunction>> { public PlanUnwrappingReduceOperator(ReduceFunction udf, Keys.SelectorFunctionKeys key, String name, - TypeInformation type, TypeInformation> typeInfoWithKey) - { + TypeInformation type, TypeInformation> typeInfoWithKey) { super(new ReduceWrapper(udf), new UnaryOperatorInformation, Tuple2>(typeInfoWithKey, typeInfoWithKey), key.computeLogicalKeyPositions(), name); } - public static final class ReduceWrapper extends WrappingFunction> - implements ReduceFunction> - { + private static final class ReduceWrapper extends WrappingFunction> + implements ReduceFunction> { private static final long serialVersionUID = 1L; - private ReduceWrapper(ReduceFunction wrapped) { super(wrapped); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java index f65f16997bc4b..a2a9010f569e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java @@ -20,10 +20,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; @@ -32,21 +32,19 @@ * operation only on the unwrapped values. */ @Internal -public class PlanUnwrappingSortedGroupCombineOperator extends GroupCombineOperatorBase, OUT, GroupCombineFunction,OUT>> { +public class PlanUnwrappingSortedGroupCombineOperator extends GroupCombineOperatorBase, OUT, GroupCombineFunction, OUT>> { public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction udf, Keys.SelectorFunctionKeys groupingKey, Keys.SelectorFunctionKeys sortingKey, String name, - TypeInformation outType, TypeInformation> typeInfoWithKey) - { + TypeInformation outType, TypeInformation> typeInfoWithKey) { super(new TupleUnwrappingGroupReducer(udf), new UnaryOperatorInformation, OUT>(typeInfoWithKey, outType), - groupingKey.computeLogicalKeyPositions(), + groupingKey.computeLogicalKeyPositions(), name); } - public static final class TupleUnwrappingGroupReducer extends WrappingFunction> - implements GroupCombineFunction, OUT> - { + private static final class TupleUnwrappingGroupReducer extends WrappingFunction> + implements GroupCombineFunction, OUT> { private static final long serialVersionUID = 1L; @@ -57,7 +55,6 @@ private TupleUnwrappingGroupReducer(GroupCombineFunction wrapped) { this.iter = new Tuple3UnwrappingIterator(); } - @Override public void combine(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index 8080477704c52..7f81fee89e1d4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -21,10 +21,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; @@ -33,7 +33,7 @@ * operation only on the unwrapped values. */ @Internal -public class PlanUnwrappingSortedReduceGroupOperator extends GroupReduceOperatorBase, OUT, GroupReduceFunction,OUT>> { +public class PlanUnwrappingSortedReduceGroupOperator extends GroupReduceOperatorBase, OUT, GroupReduceFunction, OUT>> { public PlanUnwrappingSortedReduceGroupOperator( GroupReduceFunction udf, @@ -42,8 +42,7 @@ public PlanUnwrappingSortedReduceGroupOperator( String name, TypeInformation outType, TypeInformation> - typeInfoWithKey, boolean combinable) - { + typeInfoWithKey, boolean combinable) { super( combinable ? new TupleUnwrappingGroupCombinableGroupReducer(udf) : @@ -55,9 +54,8 @@ public PlanUnwrappingSortedReduceGroupOperator( // -------------------------------------------------------------------------------------------- - public static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple3> - { + private static final class TupleUnwrappingGroupCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT>, GroupCombineFunction, Tuple3> { private static final long serialVersionUID = 1L; @@ -67,7 +65,7 @@ public static final class TupleUnwrappingGroupCombinableGroupReducer wrapped) { super(wrapped); - if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { + if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface."); } @@ -75,7 +73,6 @@ private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction this.coll = new Tuple3WrappingCollector<>(this.iter); } - @Override public void reduce(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); @@ -87,7 +84,7 @@ public void reduce(Iterable> values, Collector out) thro public void combine(Iterable> values, Collector> out) throws Exception { iter.set(values.iterator()); coll.set(out); - ((GroupCombineFunction)this.wrappedFunction).combine(iter, coll); + ((GroupCombineFunction) this.wrappedFunction).combine(iter, coll); } @Override @@ -96,9 +93,8 @@ public String toString() { } } - public static final class TupleUnwrappingNonCombinableGroupReducer extends WrappingFunction> - implements GroupReduceFunction, OUT> - { + private static final class TupleUnwrappingNonCombinableGroupReducer extends WrappingFunction> + implements GroupReduceFunction, OUT> { private static final long serialVersionUID = 1L; @@ -109,7 +105,6 @@ private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction wr this.iter = new Tuple3UnwrappingIterator<>(); } - @Override public void reduce(Iterable> values, Collector out) throws Exception { iter.set(values.iterator()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java index d8c54d6147c2a..3f6463a07aed5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.common.functions.CombineFunction; @@ -31,7 +32,7 @@ * and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime. */ public class RichCombineToGroupCombineWrapper & CombineFunction> - extends RichGroupCombineFunction implements GroupReduceFunction { + extends RichGroupCombineFunction implements GroupReduceFunction { private final F wrappedFunction; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java index fd3b4f697f4dc..b697ac93ea3a2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java @@ -18,12 +18,12 @@ package org.apache.flink.api.java.operators.translation; -import java.util.Iterator; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.TraversableOnceException; +import java.util.Iterator; + /** * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field). * The iterator also tracks the groupKeys, as the triples flow though it. diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java index 189dcdbd84d45..57b6bc74ec2f5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java @@ -23,7 +23,7 @@ import org.apache.flink.util.Collector; /** - * Needed to wrap tuples to {@code Tuple3} for combine method of group reduce with key selector sorting + * Needed to wrap tuples to {@code Tuple3} for combine method of group reduce with key selector sorting. */ @Internal public class Tuple3WrappingCollector implements Collector, java.io.Serializable { @@ -35,7 +35,6 @@ public class Tuple3WrappingCollector implements Collector, java. private Collector> wrappedCollector; - public Tuple3WrappingCollector(Tuple3UnwrappingIterator tui) { this.tui = tui; this.outTuple = new Tuple3(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java index 2ff73efb1741c..e39ec471d4b77 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java @@ -23,6 +23,14 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * Joiner that unwraps values from the left set before applying the join operation. + * + * @param type of values in the left set + * @param type of values in the right set + * @param type of resulting values + * @param type of key + */ @Internal public final class TupleLeftUnwrappingJoiner extends WrappingFunction> diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java index c9b9c27590311..847acd72897e2 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java @@ -23,6 +23,14 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * Joiner that unwraps values from the right set before applying the join operation. + * + * @param type of values in the left set + * @param type of values in the right set + * @param type of resulting values + * @param type of key + */ @Internal public final class TupleRightUnwrappingJoiner extends WrappingFunction> diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java index 5dbe266ba1430..16ebef84bbc27 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java @@ -18,12 +18,12 @@ package org.apache.flink.api.java.operators.translation; -import java.util.Iterator; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.TraversableOnceException; +import java.util.Iterator; + /** * An iterator that reads 2-tuples (key value pairs) and returns only the values (second field). * The iterator also tracks the keys, as the pairs flow though it. @@ -32,16 +32,16 @@ public class TupleUnwrappingIterator implements Iterator, Iterable, java.io.Serializable { private static final long serialVersionUID = 1L; - - private K lastKey; + + private K lastKey; private Iterator> iterator; private boolean iteratorAvailable; - + public void set(Iterator> iterator) { this.iterator = iterator; this.iteratorAvailable = true; } - + public K getLastKey() { return lastKey; } @@ -53,7 +53,7 @@ public boolean hasNext() { @Override public T next() { - Tuple2 t = iterator.next(); + Tuple2 t = iterator.next(); this.lastKey = t.f0; return t.f1; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java index e0ee3b3f2af2c..1e56dac05495b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java @@ -23,6 +23,14 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; +/** + * Joiner that unwraps both values before applying the join operation. + * + * @param type of values in the left set + * @param type of values in the right set + * @param type of resulting values + * @param type of key + */ @Internal public final class TupleUnwrappingJoiner extends WrappingFunction> diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java index 4581bf2302c85..73698047bc360 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java @@ -23,28 +23,27 @@ import org.apache.flink.util.Collector; /** - * Needed to wrap tuples to {@code Tuple2} pairs for combine method of group reduce with key selector function + * Needed to wrap tuples to {@code Tuple2} pairs for combine method of group reduce with key selector function. */ @Internal public class TupleWrappingCollector implements Collector, java.io.Serializable { - + private static final long serialVersionUID = 1L; private final TupleUnwrappingIterator tui; private final Tuple2 outTuple; - + private Collector> wrappedCollector; - - + public TupleWrappingCollector(TupleUnwrappingIterator tui) { this.tui = tui; this.outTuple = new Tuple2(); } - + public void set(Collector> wrappedCollector) { this.wrappedCollector = wrappedCollector; } - + @Override public void close() { this.wrappedCollector.close(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java index 7d5e39b6396b2..9449237fd8552 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java @@ -19,11 +19,17 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple3; +/** + * Mapper that extracts two keys of a value. + * @param type of the values + * @param type of the first key + * @param type of the second key + */ @Internal @ForwardedFields("*->2") public final class TwoKeyExtractingMapper extends RichMapFunction> { @@ -36,13 +42,11 @@ public final class TwoKeyExtractingMapper extends RichMapFunction tuple = new Tuple3(); - public TwoKeyExtractingMapper(KeySelector keySelector1, KeySelector keySelector2) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; } - @Override public Tuple3 map(T value) throws Exception { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java index 9851c429c6db3..6c52aceb83475 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java @@ -25,9 +25,13 @@ import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; +/** + * Wrapper around {@link Function}. + * @param + */ @Internal public abstract class WrappingFunction extends AbstractRichFunction { - + private static final long serialVersionUID = 1L; protected T wrappedFunction; @@ -36,21 +40,20 @@ protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; } - @Override public void open(Configuration parameters) throws Exception { FunctionUtils.openFunction(this.wrappedFunction, parameters); } - + @Override public void close() throws Exception { FunctionUtils.closeFunction(this.wrappedFunction); } - + @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); - + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java index 0ce79e3e353b3..2f74288e9919b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java @@ -16,25 +16,28 @@ * limitations under the License. */ - package org.apache.flink.api.java.operators.translation; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.types.StringValue; + import org.junit.Test; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for translation of aggregations. + */ public class AggregateTranslationTest { @Test @@ -42,26 +45,26 @@ public void translateAggregate() { try { final int parallelism = 8; ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism); - + @SuppressWarnings("unchecked") - DataSet> initialData = + DataSet> initialData = env.fromElements(new Tuple3(3.141592, new StringValue("foobar"), Long.valueOf(77))); - + initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat>()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); - + GroupReduceOperatorBase reducer = (GroupReduceOperatorBase) sink.getInput(); - + // check keys assertEquals(1, reducer.getKeyColumns(0).length); assertEquals(0, reducer.getKeyColumns(0)[0]); - + assertEquals(-1, reducer.getParallelism()); assertTrue(reducer.isCombinable()); - + assertTrue(reducer.getInput() instanceof GenericDataSourceBase); } catch (Exception e) { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java index 887173d8e6696..9c67e60b61c44 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java @@ -18,8 +18,6 @@ package org.apache.flink.api.java.operators.translation; -import static org.junit.Assert.*; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.GenericDataSinkBase; @@ -31,8 +29,16 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; + import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Tests for translation of co-group sort. + */ @SuppressWarnings({"serial", "unchecked"}) public class CoGroupSortTranslationTest implements java.io.Serializable { @@ -40,35 +46,35 @@ public class CoGroupSortTranslationTest implements java.io.Serializable { public void testGroupSortTuples() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet> input1 = env.fromElements(new Tuple2(0L, 0L)); DataSet> input2 = env.fromElements(new Tuple3(0L, 0L, 0L)); - + input1.coGroup(input2) .where(1).equalTo(2) .sortFirstGroup(0, Order.DESCENDING) .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) - + .with(new CoGroupFunction, Tuple3, Long>() { @Override public void coGroup(Iterable> first, Iterable> second, Collector out) {} }) - + .output(new DiscardingOutputFormat()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); CoGroupOperatorBase coGroup = (CoGroupOperatorBase) sink.getInput(); - + assertNotNull(coGroup.getGroupOrderForInputOne()); assertNotNull(coGroup.getGroupOrderForInputTwo()); - + assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields()); assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue()); assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0)); - + assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields()); assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue()); assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue()); @@ -80,39 +86,39 @@ public void coGroup(Iterable> first, Iterable> input1 = env.fromElements(new Tuple2(0L, 0L)); DataSet input2 = env.fromElements(new TestPoJo()); - + input1.coGroup(input2) .where(1).equalTo("b") .sortFirstGroup(0, Order.DESCENDING) .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) - + .with(new CoGroupFunction, TestPoJo, Long>() { @Override public void coGroup(Iterable> first, Iterable second, Collector out) {} }) - + .output(new DiscardingOutputFormat()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); CoGroupOperatorBase coGroup = (CoGroupOperatorBase) sink.getInput(); - + assertNotNull(coGroup.getGroupOrderForInputOne()); assertNotNull(coGroup.getGroupOrderForInputTwo()); - + assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields()); assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue()); assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0)); - + assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields()); assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue()); assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue()); @@ -124,7 +130,10 @@ public void coGroup(Iterable> first, Iterable secon fail(e.getMessage()); } } - + + /** + * Sample test pojo. + */ public static class TestPoJo { public long a; public long b; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java index fd60bc6a5ee58..e4cb8c4a27c47 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java @@ -16,93 +16,95 @@ * limitations under the License. */ - package org.apache.flink.api.java.operators.translation; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.Iterator; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.aggregators.LongSumAggregator; +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.common.functions.RichJoinFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.base.DeltaIterationBase; import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.common.functions.RichCoGroupFunction; -import org.apache.flink.api.common.functions.RichJoinFunction; -import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; + import org.junit.Test; +import java.util.Iterator; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests for translation of delta iterations. + */ @SuppressWarnings("serial") public class DeltaIterationTranslationTest implements java.io.Serializable { @Test public void testCorrectTranslation() { try { - final String JOB_NAME = "Test JobName"; - final String ITERATION_NAME = "Test Name"; - - final String BEFORE_NEXT_WORKSET_MAP = "Some Mapper"; - - final String AGGREGATOR_NAME = "AggregatorName"; - - final int[] ITERATION_KEYS = new int[] {2}; - final int NUM_ITERATIONS = 13; - - final int DEFAULT_parallelism= 133; - final int ITERATION_parallelism = 77; - + final String jobName = "Test JobName"; + final String iterationName = "Test Name"; + + final String beforeNextWorksetMap = "Some Mapper"; + + final String aggregatorName = "AggregatorName"; + + final int[] iterationKeys = new int[] {2}; + final int numIterations = 13; + + final int defaultParallelism = 133; + final int iterationParallelism = 77; + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + // ------------ construct the test program ------------------ { - env.setParallelism(DEFAULT_parallelism); - + env.setParallelism(defaultParallelism); + @SuppressWarnings("unchecked") DataSet> initialSolutionSet = env.fromElements(new Tuple3(3.44, 5L, "abc")); - + @SuppressWarnings("unchecked") DataSet> initialWorkSet = env.fromElements(new Tuple2(1.23, "abc")); - - DeltaIteration, Tuple2> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS); - iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism); - - iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator()); - + + DeltaIteration, Tuple2> iteration = initialSolutionSet.iterateDelta(initialWorkSet, numIterations, iterationKeys); + iteration.name(iterationName).parallelism(iterationParallelism); + + iteration.registerAggregator(aggregatorName, new LongSumAggregator()); + // test that multiple workset consumers are supported - DataSet> worksetSelfJoin = + DataSet> worksetSelfJoin = iteration.getWorkset() - .map(new IdentityMapper>()) + .map(new IdentityMapper>()) .join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1); - + DataSet> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin()); DataSet> result = iteration.closeWith( joined, - joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP)); - + joined.map(new NextWorksetMapper()).name(beforeNextWorksetMap)); + result.output(new DiscardingOutputFormat>()); result.writeAsText("/dev/null"); } - - - Plan p = env.createProgramPlan(JOB_NAME); - + + Plan p = env.createProgramPlan(jobName); + // ------------- validate the plan ---------------- - assertEquals(JOB_NAME, p.getJobName()); - assertEquals(DEFAULT_parallelism, p.getDefaultParallelism()); - + assertEquals(jobName, p.getJobName()); + assertEquals(defaultParallelism, p.getDefaultParallelism()); + // validate the iteration GenericDataSinkBase sink1, sink2; { @@ -110,23 +112,23 @@ public void testCorrectTranslation() { sink1 = sinks.next(); sink2 = sinks.next(); } - + DeltaIterationBase iteration = (DeltaIterationBase) sink1.getInput(); - + // check that multi consumer translation works for iterations assertEquals(iteration, sink2.getInput()); - + // check the basic iteration properties - assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations()); - assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields()); - assertEquals(ITERATION_parallelism, iteration.getParallelism()); - assertEquals(ITERATION_NAME, iteration.getName()); - + assertEquals(numIterations, iteration.getMaximumNumberOfIterations()); + assertArrayEquals(iterationKeys, iteration.getSolutionSetKeyFields()); + assertEquals(iterationParallelism, iteration.getParallelism()); + assertEquals(iterationName, iteration.getName()); + MapOperatorBase nextWorksetMapper = (MapOperatorBase) iteration.getNextWorkset(); InnerJoinOperatorBase solutionSetJoin = (InnerJoinOperatorBase) iteration.getSolutionSetDelta(); InnerJoinOperatorBase worksetSelfJoin = (InnerJoinOperatorBase) solutionSetJoin.getFirstInput(); MapOperatorBase worksetMapper = (MapOperatorBase) worksetSelfJoin.getFirstInput(); - + assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass()); assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass()); if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) { @@ -137,9 +139,9 @@ public void testCorrectTranslation() { assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass()); } - assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName()); - - assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName()); + assertEquals(beforeNextWorksetMap, nextWorksetMapper.getName()); + + assertEquals(aggregatorName, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -147,20 +149,20 @@ public void testCorrectTranslation() { fail(e.getMessage()); } } - + @Test public void testRejectWhenSolutionSetKeysDontMatchJoin() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + @SuppressWarnings("unchecked") DataSet> initialSolutionSet = env.fromElements(new Tuple3(3.44, 5L, "abc")); @SuppressWarnings("unchecked") DataSet> initialWorkSet = env.fromElements(new Tuple2(1.23, "abc")); - + DeltaIteration, Tuple2> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1); - + try { iteration.getWorkset().join(iteration.getSolutionSet()).where(1).equalTo(2); fail("Accepted invalid program."); @@ -168,7 +170,7 @@ public void testRejectWhenSolutionSetKeysDontMatchJoin() { catch (InvalidProgramException e) { // all good! } - + try { iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1); fail("Accepted invalid program."); @@ -183,20 +185,20 @@ public void testRejectWhenSolutionSetKeysDontMatchJoin() { fail(e.getMessage()); } } - + @Test public void testRejectWhenSolutionSetKeysDontMatchCoGroup() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + @SuppressWarnings("unchecked") DataSet> initialSolutionSet = env.fromElements(new Tuple3(3.44, 5L, "abc")); @SuppressWarnings("unchecked") DataSet> initialWorkSet = env.fromElements(new Tuple2(1.23, "abc")); - + DeltaIteration, Tuple2> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1); - + try { iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1()); fail("Accepted invalid program."); @@ -204,7 +206,7 @@ public void testRejectWhenSolutionSetKeysDontMatchCoGroup() { catch (InvalidProgramException e) { // all good! } - + try { iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2()); fail("Accepted invalid program."); @@ -219,40 +221,40 @@ public void testRejectWhenSolutionSetKeysDontMatchCoGroup() { fail(e.getMessage()); } } - + // -------------------------------------------------------------------------------------------- - - public static class SolutionWorksetJoin extends RichJoinFunction, Tuple3, Tuple3> { + + private static class SolutionWorksetJoin extends RichJoinFunction, Tuple3, Tuple3> { @Override public Tuple3 join(Tuple2 first, Tuple3 second){ return null; } } - - public static class NextWorksetMapper extends RichMapFunction, Tuple2> { + + private static class NextWorksetMapper extends RichMapFunction, Tuple2> { @Override public Tuple2 map(Tuple3 value) { return null; } } - - public static class IdentityMapper extends RichMapFunction { + + private static class IdentityMapper extends RichMapFunction { @Override public T map(T value) throws Exception { return value; } } - - public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction, Tuple3, Tuple3> { + + private static class SolutionWorksetCoGroup1 extends RichCoGroupFunction, Tuple3, Tuple3> { @Override public void coGroup(Iterable> first, Iterable> second, Collector> out) { } } - - public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction, Tuple2, Tuple3> { + + private static class SolutionWorksetCoGroup2 extends RichCoGroupFunction, Tuple2, Tuple3> { @Override public void coGroup(Iterable> second, Iterable> first, diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java index 27c7b2fb62b74..6a98a8da31da6 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.types.LongValue; import org.apache.flink.types.StringValue; + import org.junit.Test; import java.io.Serializable; @@ -45,6 +46,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for translation of distinct operation. + */ @SuppressWarnings("serial") public class DistinctTranslationTest { @@ -164,7 +168,7 @@ public void translateDistinctKeySelector() { DataSet> initialData = getSourceDataSet(env); - initialData.distinct(new KeySelector, StringValue>() { + initialData.distinct(new KeySelector, StringValue>() { public StringValue getKey(Tuple3 value) { return value.f1; } @@ -183,7 +187,7 @@ public StringValue getKey(Tuple3 value) { assertEquals(4, reducer.getParallelism()); // check types - TypeInformation keyValueInfo = new TupleTypeInfo>>( + TypeInformation keyValueInfo = new TupleTypeInfo>>( new ValueTypeInfo(StringValue.class), initialData.getType()); @@ -245,7 +249,7 @@ public void translateDistinctExpressionKey() { } @SuppressWarnings("unchecked") - private static final DataSet> getSourceDataSet(ExecutionEnvironment env) { + private static DataSet> getSourceDataSet(ExecutionEnvironment env) { return env.fromElements(new Tuple3(3.141592, new StringValue("foobar"), new LongValue(77))) .setParallelism(1); } @@ -256,6 +260,9 @@ private static DataSet getSourcePojoDataSet(ExecutionEnvironment env return env.fromCollection(data); } + /** + * Custom data type, for testing purposes. + */ public static class CustomType implements Serializable { private static final long serialVersionUID = 1L; @@ -269,7 +276,7 @@ public CustomType(int i) { @Override public String toString() { - return ""+myInt; + return "" + myInt; } } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java index 3adbbb8033dc9..486cad4bc791e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java @@ -16,20 +16,19 @@ * limitations under the License. */ - package org.apache.flink.api.java.operators.translation; -import static org.junit.Assert.*; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -37,10 +36,17 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.types.LongValue; import org.apache.flink.types.StringValue; + import org.junit.Test; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for translation of reduce operation. + */ @SuppressWarnings("serial") public class ReduceTranslationTests implements java.io.Serializable { @@ -49,31 +55,31 @@ public void translateNonGroupedReduce() { try { final int parallelism = 8; ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism); - + DataSet> initialData = getSourceDataSet(env); - - initialData.reduce(new RichReduceFunction>() { + + initialData.reduce(new RichReduceFunction>() { public Tuple3 reduce(Tuple3 value1, Tuple3 value2) { return value1; } }).output(new DiscardingOutputFormat>()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); - + ReduceOperatorBase reducer = (ReduceOperatorBase) sink.getInput(); - + // check types assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - + // check keys assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0); - + // parallelism was not configured on the operator assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT); - + assertTrue(reducer.getInput() instanceof GenericDataSourceBase); } catch (Exception e) { @@ -82,40 +88,40 @@ public Tuple3 reduce(Tuple3> initialData = getSourceDataSet(env); - + initialData .groupBy(2) - .reduce(new RichReduceFunction>() { + .reduce(new RichReduceFunction>() { public Tuple3 reduce(Tuple3 value1, Tuple3 value2) { return value1; } }) .output(new DiscardingOutputFormat>()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); - + ReduceOperatorBase reducer = (ReduceOperatorBase) sink.getInput(); - + // check types assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType()); assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType()); - + // parallelism was not configured on the operator assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT); - + // check keys assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0)); - + assertTrue(reducer.getInput() instanceof GenericDataSourceBase); } catch (Exception e) { @@ -124,60 +130,58 @@ public Tuple3 reduce(Tuple3> initialData = getSourceDataSet(env); - + initialData - .groupBy(new KeySelector, StringValue>() { + .groupBy(new KeySelector, StringValue>() { public StringValue getKey(Tuple3 value) { return value.f1; } }) - .reduce(new RichReduceFunction>() { + .reduce(new RichReduceFunction>() { public Tuple3 reduce(Tuple3 value1, Tuple3 value2) { return value1; } }).setParallelism(4) .output(new DiscardingOutputFormat>()); - + Plan p = env.createProgramPlan(); - + GenericDataSinkBase sink = p.getDataSinks().iterator().next(); - - + MapOperatorBase keyProjector = (MapOperatorBase) sink.getInput(); PlanUnwrappingReduceOperator reducer = (PlanUnwrappingReduceOperator) keyProjector.getInput(); MapOperatorBase keyExtractor = (MapOperatorBase) reducer.getInput(); - + // check the parallelisms assertEquals(1, keyExtractor.getParallelism()); assertEquals(4, reducer.getParallelism()); assertEquals(4, keyProjector.getParallelism()); - + // check types - TypeInformation keyValueInfo = new TupleTypeInfo>>( + TypeInformation keyValueInfo = new TupleTypeInfo>>( new ValueTypeInfo(StringValue.class), initialData.getType()); - + assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType()); assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType()); - + assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType()); assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType()); - + assertEquals(keyValueInfo, keyProjector.getOperatorInfo().getInputType()); assertEquals(initialData.getType(), keyProjector.getOperatorInfo().getOutputType()); - + // check keys assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass()); - + assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase); } catch (Exception e) { @@ -186,9 +190,9 @@ public Tuple3 reduce(Tuple3> getSourceDataSet(ExecutionEnvironment env) { + private static DataSet> getSourceDataSet(ExecutionEnvironment env) { return env.fromElements(new Tuple3(3.141592, new StringValue("foobar"), new LongValue(77))) .setParallelism(1); } diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml index d7e42e5f1da9d..3bb855639214f 100644 --- a/tools/maven/suppressions-java.xml +++ b/tools/maven/suppressions-java.xml @@ -32,14 +32,6 @@ under the License. files="(.*)api[/\\]java[/\\]operators[/\\]join[/\\](.*)" checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/> - - - -