Skip to content

Commit

Permalink
[FLINK-7191] Activate checkstyle flink-java/operators/translation
Browse files Browse the repository at this point in the history
This closes apache#4334.
  • Loading branch information
dawidwys authored and zentol committed Jul 31, 2017
1 parent 0c9c9fb commit 8e97536
Show file tree
Hide file tree
Showing 29 changed files with 375 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.operators.translation;

import org.apache.flink.annotation.Internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,37 @@
package org.apache.flink.api.java.operators.translation;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

/**
* Mapper that extracts keys.
* @param <T> type of value
* @param <K> type of key
*/
@Internal
@ForwardedFields("*->1")
public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {

private static final long serialVersionUID = 1L;

private final KeySelector<T, K> keySelector;

private final Tuple2<K, T> tuple = new Tuple2<K, T>();



public KeyExtractingMapper(KeySelector<T, K> keySelector) {
this.keySelector = keySelector;
}



@Override
public Tuple2<K, T> map(T value) throws Exception {

K key = keySelector.getKey(value);
tuple.f0 = key;
tuple.f1 = value;

return tuple;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;

/**
* Mapper that removes keys.
* @param <T> type of values
* @param <K> type of keys
*/
@Internal
@ForwardedFields("1->*")
public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {

private static final long serialVersionUID = 1L;

@Override
public T map(Tuple2<K, T> value) {
return value.f1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* A co group operator that applies the operation only on the unwrapped values.
*/
@Internal
public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
{
extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> {

public PlanBothUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
Expand All @@ -52,23 +54,21 @@ public PlanBothUnwrappingCoGroupOperator(
name);
}

public static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
private static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
{
implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
private static final long serialVersionUID = 1L;

private final TupleUnwrappingIterator<I1, K> iter1;
private final TupleUnwrappingIterator<I2, K> iter2;

private TupleBothUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);

this.iter1 = new TupleUnwrappingIterator<I1, K>();
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}


@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
Expand All @@ -79,6 +79,6 @@ public void coGroup(
iter2.set(records2.iterator());
this.wrappedFunction.coGroup(iter1, iter2, out);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,33 @@
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;

/**
* @see FilterOperatorBase
* @param <T>
*/
@Internal
@ForwardedFields("*")
public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {

public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
}

/**
* @see FlatMapFunction
* @param <T>
*/
public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
implements FlatMapFunction<T, T>
{
implements FlatMapFunction<T, T> {

private static final long serialVersionUID = 1L;

private FlatMapFilter(FilterFunction<T> wrapped) {
super(wrapped);
}

@Override
public final void flatMap(T value, Collector<T> out) throws Exception {
public void flatMap(T value, Collector<T> out) throws Exception {
if (this.wrappedFunction.filter(value)) {
out.collect(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* A co group operator that applies the operation only on the unwrapped values on the left.
*/
@Internal
public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>>
{
extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>> {

public PlanLeftUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
Expand All @@ -52,21 +54,20 @@ public PlanLeftUnwrappingCoGroupOperator(
name);
}

public static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
private static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<Tuple2<K, I1>, I2, OUT> {

private static final long serialVersionUID = 1L;

private final TupleUnwrappingIterator<I1, K> iter1;

private TupleLeftUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);

this.iter1 = new TupleUnwrappingIterator<I1, K>();
}


@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,33 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;

/**
* A map operator that retains a subset of fields from incoming tuples.
*
* @param <T> Input tuple type
* @param <R> Output tuple type
*/
@Internal
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {

public PlanProjectOperator(int[] fields, String name,
TypeInformation<T> inType, TypeInformation<R> outType,
ExecutionConfig executionConfig)
{
ExecutionConfig executionConfig) {
super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
}

@SuppressWarnings("unchecked")
private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
return (MapFunction<T, R>) new MapProjector<X, R>(fields);
}


public static final class MapProjector<T extends Tuple, R extends Tuple>
extends AbstractRichFunction implements MapFunction<T, R>
{

private static final class MapProjector<T extends Tuple, R extends Tuple>
extends AbstractRichFunction implements MapFunction<T, R> {
private static final long serialVersionUID = 1L;

private final int[] fields;
private final Tuple outTuple;

private MapProjector(int[] fields) {
this.fields = fields;
try {
Expand All @@ -69,7 +72,7 @@ public R map(Tuple inTuple) throws Exception {
for (int i = 0; i < fields.length; i++) {
outTuple.setField(inTuple.getField(fields[i]), i);
}

return (R) outTuple;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* A co group operator that applies the operation only on the unwrapped values on the right.
*/
@Internal
public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>>
{
extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>> {

public PlanRightUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
Expand All @@ -52,7 +54,7 @@ public PlanRightUnwrappingCoGroupOperator(
name);
}

public static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
private static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<I1, Tuple2<K, I2>, OUT> {

Expand All @@ -66,7 +68,6 @@ private TupleRightUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}


@Override
public void coGroup(
Iterable<I1> records1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

Expand All @@ -35,35 +35,32 @@
public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {

public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
{
TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) {
super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);

}

// --------------------------------------------------------------------------------------------

public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
implements GroupCombineFunction<Tuple2<K, IN>, OUT>
{


private static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
implements GroupCombineFunction<Tuple2<K, IN>, OUT> {

private static final long serialVersionUID = 1L;
private final TupleUnwrappingIterator<IN, K> iter;

private final TupleUnwrappingIterator<IN, K> iter;

private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
}



@Override
public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
this.wrappedFunction.combine(iter, out);
}

@Override
public String toString() {
return this.wrappedFunction.toString();
Expand Down
Loading

0 comments on commit 8e97536

Please sign in to comment.