Skip to content

Commit

Permalink
[FLINK-2054] Add object-reuse switch for streaming
Browse files Browse the repository at this point in the history
The switch was already there: enableObjectReuse() in ExecutionConfig.
This was simply not considered by the streaming runtime. This change now
draws a copy before forwarding an element to a chained operator when
object reuse is disabled.
  • Loading branch information
aljoscha committed Jun 8, 2015
1 parent 7805db8 commit 26304c2
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 9 deletions.
20 changes: 20 additions & 0 deletions docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ The `DataStream` is the basic data abstraction provided by the Flink Streaming.

The transformations may return different data stream types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.

### Object Reuse Behavior

Apache Flink is trying to reduce the number of object allocations for better performance.

By default, user defined functions (like `map()` or `reduce()`) are getting new objects on each call
(or through an iterator). So it is possible to keep references to the objects inside the function
(for example in a List).

There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode:

```
env.getExecutionConfig().enableObjectReuse()
```

For mutable types, Flink will reuse object
instances. In practice that means that a `map()` function will always receive the same object
instance (with its fields set to new values). The object reuse mode will lead to better performance
because fewer objects are created, but the user has to manually take care of what they are doing
with the object references.

### Partitioning

Partitioning controls how individual data points of a stream are distributed among the parallel instances of the transformation operators. This also controls the ordering of the records in the `DataStream`. There is partial ordering guarantee for the outputs with respect to the partitioning scheme (outputs produced from each partition are guaranteed to arrive in the order they were produced).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,18 @@ private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
}

private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())), input.isPartitioned);
StreamFilter<StreamWindow<OUT>> emptyFilter = new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>());
emptyFilter.disableInputCopy();
return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(), emptyFilter), input.isPartitioned);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>> partExtractor = new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
new WindowPartExtractor<OUT>());
partExtractor.disableInputCopy();
return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
new WindowPartExtractor<OUT>()));
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), partExtractor);
}

private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>

public transient Output<OUT> output;

protected boolean inputCopyDisabled = false;

// A sane default for most operators
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

Expand Down Expand Up @@ -64,4 +66,17 @@ public final void setChainingStrategy(ChainingStrategy strategy) {
public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}

@Override
public boolean isInputCopyingDisabled() {
return inputCopyDisabled;
}

/**
* Enable object-reuse for this operator instance. This overrides the setting in
* the {@link org.apache.flink.api.common.ExecutionConfig}/
*/
public void disableInputCopy() {
this.inputCopyDisabled = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public interface StreamOperator<OUT> extends Serializable {
*/
public void close() throws Exception;

/**
* An operator can return true here to disable copying of its input elements. This overrides
* the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
*/
public boolean isInputCopyingDisabled();

public void setChainingStrategy(ChainingStrategy strategy);

public ChainingStrategy getChainingStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class StreamWindowBuffer<T>
public StreamWindowBuffer(WindowBuffer<T> buffer) {
this.buffer = buffer;
setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
disableInputCopy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class WindowFlattener<T> extends AbstractStreamOperator<T>

public WindowFlattener() {
chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
disableInputCopy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
this.folder = folder;
disableInputCopy();
}

private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
super(new WindowMap<IN, OUT>(mapper));
this.mapper = mapper;
disableInputCopy();
}

private static class WindowMap<T, R> extends AbstractRichFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>

public WindowMerger() {
this.windows = new HashMap<Integer, StreamWindow<T>>();

chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
disableInputCopy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public WindowPartitioner(KeySelector<T, ?> keySelector) {
this.keySelector = keySelector;

chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
disableInputCopy();
}

public WindowPartitioner(int numberOfSplits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
public WindowReducer(ReduceFunction<IN> reducer) {
super(new WindowReduceFunction<IN>(reducer));
this.reducer = reducer;
disableInputCopy();
}

private static class WindowReduceFunction<T> extends AbstractRichFunction implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
Expand Down Expand Up @@ -157,7 +158,13 @@ private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig) {
chainableOperator.setup(wrapper, chainedContext);

chainedOperators.add(chainableOperator);
return new OperatorCollector<X>(chainableOperator);
if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
return new OperatorCollector<X>(chainableOperator);
} else {
return new CopyingOperatorCollector<X>(
chainableOperator,
(TypeSerializer<X>) chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()).getObjectSerializer());
}
}

}
Expand Down Expand Up @@ -219,7 +226,7 @@ public void clearWriters() {
}

private static class OperatorCollector<T> implements Output<T> {
private OneInputStreamOperator operator;
protected OneInputStreamOperator operator;

public OperatorCollector(OneInputStreamOperator<?, T> operator) {
this.operator = operator;
Expand All @@ -239,7 +246,7 @@ public void collect(T record) {
}

@Override
public void close() {
public final void close() {
try {
operator.close();
} catch (Exception e) {
Expand All @@ -249,4 +256,26 @@ public void close() {
}
}
}

private static class CopyingOperatorCollector<T> extends OperatorCollector<T> {
private final TypeSerializer<T> serializer;

public CopyingOperatorCollector(OneInputStreamOperator<?, T> operator, TypeSerializer<T> serializer) {
super(operator);
this.serializer = serializer;
}

@Override
@SuppressWarnings("unchecked")
public void collect(T record) {
try {
operator.processElement(serializer.copy(record));
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Could not forward element to operator.", e);
}
throw new RuntimeException(e);
}
}
}
}

0 comments on commit 26304c2

Please sign in to comment.