Skip to content

Commit

Permalink
[streaming] Operator invokable refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and StephanEwen committed Aug 18, 2014
1 parent 1fccb10 commit be459ae
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
Expand Down Expand Up @@ -292,7 +293,7 @@ public DataStream<T> shuffle() {
public DataStream<T> forward() {
return setConnectionType(new ForwardPartitioner<T>());
}

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are distributed evenly to the next component.
Expand Down Expand Up @@ -331,6 +332,27 @@ public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {

}

/**
* Applies a CoMap transformation on two separate {@link DataStream}s. The
* transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
* of the first DataStream (on which .coMapWith was called) and
* {@link CoMapFunction#map2(Tuple)} for each element of the second
* DataStream. Each CoMapFunction call returns exactly one element.
*
* @param coMapper
* The CoMapFunction used to jointly transform the two input
* DataStreams
* @param otherStream
* The DataStream that will be transformed with
* {@link CoMapFunction#map2(Tuple)}
* @return The transformed DataStream
*/
public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(
otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
}

/**
* Applies a FlatMap transformation on a {@link DataStream}. The
* transformation calls a FlatMapFunction for each element of the
Expand Down Expand Up @@ -387,11 +409,6 @@ public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T,
new BatchReduceInvokable<T, R>(reducer, batchSize));
}

public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
}


/**
* Applies a reduce transformation on preset "time" chunks of the
* DataStream. The transformation calls a {@link GroupReduceFunction} on
Expand All @@ -411,7 +428,7 @@ public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction
public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
long windowSize) {
return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
new BatchReduceInvokable<T, R>(reducer, windowSize));
new WindowReduceInvokable<T, R>(reducer, windowSize));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ public SinkInvokable(SinkFunction<IN> sinkFunction) {
}

@Override
public void invoke() throws Exception {
if (this.isMutable) {
while (recordIterator.next(reuse) != null) {
sinkFunction.invoke((IN) reuse.getTuple());
}
} else {
while (recordIterator.next(reuse) != null) {
sinkFunction.invoke((IN) reuse.getTuple());
resetReuse();
}
protected void immutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
sinkFunction.invoke((IN) reuse.getTuple());
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
sinkFunction.invoke((IN) reuse.getTuple());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.flink.streaming.api.invokable;

import java.io.IOException;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
Expand Down Expand Up @@ -49,6 +51,25 @@ public void initialize(Collector<OUT> collector,
protected void resetReuse() {
this.reuse = serializer.createInstance();
}

protected StreamRecord<IN> loadNextRecord() {
try {
reuse = recordIterator.next(reuse);
} catch (IOException e) {
e.printStackTrace();
}
return reuse;
}

protected abstract void immutableInvoke() throws Exception;

public abstract void invoke() throws Exception;
protected abstract void mutableInvoke() throws Exception;

public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
} else {
immutableInvoke();
}
}
}
156 changes: 26 additions & 130 deletions ...src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -19,109 +19,53 @@

package org.apache.flink.streaming.api.invokable.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;

public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
UserTaskInvokable<IN, OUT> {
StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private int batchSize;
private long windowSize;
volatile boolean isRunning;
boolean window;

private GroupReduceFunction<IN, OUT> reducer;

public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
this.reducer = reduceFunction;
this.batchSize = batchSize;
}

public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
this.reducer = reduceFunction;
this.windowSize = windowSize;
this.window = true;
}

private StreamRecord<IN> loadNextRecord() {
try {
reuse = recordIterator.next(reuse);
} catch (IOException e) {
e.printStackTrace();
}
return reuse;
}

@Override
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
} else {
immutableInvoke();
}
}

private void immutableInvoke() throws Exception {
protected void immutableInvoke() throws Exception {
List<IN> tupleBatch = new ArrayList<IN>();
boolean batchStart;

if (window) {
long startTime = System.currentTimeMillis();
while (loadNextRecord() != null) {
batchStart = true;
do {
if (batchStart) {
batchStart = false;
} else {
reuse = loadNextRecord();
if (reuse == null) {
break;
}
int counter = 0;

while (loadNextRecord() != null) {
batchStart = true;
do {
if (batchStart) {
batchStart = false;
} else {
reuse = loadNextRecord();
if (reuse == null) {
break;
}
tupleBatch.add(reuse.getTuple());
resetReuse();
} while (System.currentTimeMillis() - startTime < windowSize);
reducer.reduce(tupleBatch.iterator(), collector);
tupleBatch.clear();
startTime = System.currentTimeMillis();
}
} else {
int counter = 0;
while (loadNextRecord() != null) {
batchStart = true;
do {
if (batchStart) {
batchStart = false;
} else {
reuse = loadNextRecord();
if (reuse == null) {
break;
}
}
counter++;
tupleBatch.add(reuse.getTuple());
resetReuse();
} while (counter < batchSize);
reducer.reduce(tupleBatch.iterator(), collector);
tupleBatch.clear();
counter = 0;
}
}
counter++;
tupleBatch.add(reuse.getTuple());
resetReuse();
} while (counter < batchSize);
reducer.reduce(tupleBatch.iterator(), collector);
tupleBatch.clear();
counter = 0;
}

}

private void mutableInvoke() throws Exception {
BatchIterator<IN> userIterator;
if (window) {
userIterator = new WindowIterator();
} else {
userIterator = new CounterIterator();
}
@Override
protected void mutableInvoke() throws Exception {
BatchIterator<IN> userIterator = new CounterIterator();

do {
if (userIterator.hasNext()) {
Expand All @@ -131,7 +75,7 @@ private void mutableInvoke() throws Exception {
} while (reuse != null);
}

public class CounterIterator implements BatchIterator<IN> {
private class CounterIterator implements BatchIterator<IN> {
private int counter;
private boolean loadedNext;

Expand Down Expand Up @@ -179,52 +123,4 @@ public void remove() {

}

public class WindowIterator implements BatchIterator<IN> {

volatile boolean iterate;
private boolean loadedNext;
private long startTime;

public WindowIterator() {
startTime = System.currentTimeMillis();
}

@Override
public boolean hasNext() {
if (System.currentTimeMillis() - startTime > windowSize) {
return false;
} else if (!loadedNext) {
loadNextRecord();
loadedNext = true;
}
return (reuse != null);
}

@Override
public IN next() {
if (hasNext()) {
loadedNext = false;
return reuse.getTuple();
} else {
loadedNext = false;
return reuse.getTuple();
}
}

public void reset() {
while (System.currentTimeMillis() - startTime < windowSize) {
loadNextRecord();
}
loadNextRecord();
loadedNext = true;
startTime = System.currentTimeMillis();
}

@Override
public void remove() {

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ public FilterInvokable(FilterFunction<IN> filterFunction) {
}

@Override
public void invoke() throws Exception {
if (this.isMutable) {
while (recordIterator.next(reuse) != null) {
if (filterFunction.filter(reuse.getTuple())) {
collector.collect(reuse.getTuple());
}
}
} else {
while (recordIterator.next(reuse) != null) {
if (filterFunction.filter(reuse.getTuple())) {
collector.collect(reuse.getTuple());
}
resetReuse();
protected void immutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
if (filterFunction.filter(reuse.getTuple())) {
collector.collect(reuse.getTuple());
}
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
if (filterFunction.filter(reuse.getTuple())) {
collector.collect(reuse.getTuple());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
this.flatMapper = flatMapper;
}

public void invoke() throws Exception {
if (this.isMutable) {
while (recordIterator.next(reuse) != null) {
flatMapper.flatMap(reuse.getTuple(), collector);
}
} else {
while (recordIterator.next(reuse) != null) {
flatMapper.flatMap(reuse.getTuple(), collector);
resetReuse();
}
@Override
protected void immutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
flatMapper.flatMap(reuse.getTuple(), collector);
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while (recordIterator.next(reuse) != null) {
flatMapper.flatMap(reuse.getTuple(), collector);
}
}
}
Loading

0 comments on commit be459ae

Please sign in to comment.