Skip to content

Commit

Permalink
[streaming] Removed unused StreamReduce
Browse files Browse the repository at this point in the history
Refactored corresponding tests, some minor cleanups.
  • Loading branch information
mbalassi committed Oct 6, 2015
1 parent 4938ff0 commit 906bd6d
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;

/**
* Base interface for all streaming operator states. It can represent both
Expand All @@ -30,9 +32,9 @@
* State can be accessed and manipulated using the {@link #value()} and
* {@link #update(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
* {@link MapFunction#map()} and can lead tp unexpected behavior in the
* {@link #open(org.apache.flink.configuration.Configuration)} or
* {@link #close()} methods.
* {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
* {@link AbstractRichFunction#open(Configuration)} or
* {@link AbstractRichFunction#close()} methods.
*
* @param <T>
* Type of the operator state
Expand All @@ -59,7 +61,7 @@ public interface OperatorState<T> {
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
* @param state
* @param value
* The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN>{

private static final long serialVersionUID = 1L;

Expand All @@ -41,7 +43,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
Object key = keySelector.getKey(element.getValue());

if (values == null) {
values = new HashMap<Object, IN>();
values = new HashMap<>();
}

IN currentValue = values.get(key);
Expand All @@ -56,4 +58,9 @@ public void processElement(StreamRecord<IN> element) throws Exception {
}
}

@Override
public void processWatermark(Watermark mark) throws Exception {
output.emitWatermark(mark);
}

}

This file was deleted.

Loading

0 comments on commit 906bd6d

Please sign in to comment.