Skip to content

Commit

Permalink
[streaming] Updated API to use RichFunctions
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and StephanEwen committed Aug 18, 2014
1 parent fed03a2 commit 776bd3f
Show file tree
Hide file tree
Showing 40 changed files with 299 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public abstract class RMQSink<IN> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;

private static final Log LOG = LogFactory.getLog(RMQSource.class);
private static final Log LOG = LogFactory.getLog(RMQSink.class);

private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.functions.FilterFunction;
import org.apache.flink.api.java.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
Expand Down Expand Up @@ -124,6 +124,14 @@ protected DataStream(DataStream<T> dataStream) {
this.iterationID = dataStream.iterationID;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
}


/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
}

/**
* Initialize the connection and partitioning among the connected
Expand All @@ -133,7 +141,7 @@ private void initConnections() {
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
partitioners = new ArrayList<StreamPartitioner<T>>();
partitioners.add(new ShufflePartitioner<T>());
partitioners.add(new ForwardPartitioner<T>());
}

/**
Expand Down Expand Up @@ -203,7 +211,7 @@ public int getParallelism() {

/**
* Gives the data transformation(vertex) a user defined name in order to use
* at directed outputs. The {@link OutputSelector} of the input vertex
* with directed outputs. The {@link OutputSelector} of the input vertex
* should use this name for directed emits.
*
* @param name
Expand Down Expand Up @@ -312,7 +320,8 @@ public DataStream<T> shuffle() {

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are forwarded to the local subtask of the next component.
* are forwarded to the local subtask of the next component. This is the
* default partitioner setting.
*
* @return The DataStream with shuffle partitioning set.
*/
Expand Down Expand Up @@ -342,19 +351,19 @@ private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {

/**
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link MapFunction} for each element of the DataStream. Each
* calls a {@link RichMapFunction} for each element of the DataStream. Each
* MapFunction call returns exactly one element.
*
* @param mapper
* The MapFunction that is called for each element of the
* The RichMapFunction that is called for each element of the
* DataStream.
* @param <R>
* output type
* @return The transformed DataStream.
*/
public <R> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
public <R> StreamOperator<T, R> map(RichMapFunction<T, R> mapper) {
return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
RichMapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
}

/**
Expand All @@ -372,8 +381,8 @@ public <R> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
* {@link CoMapFunction#map2(Tuple)}
* @return The transformed DataStream
*/
public <T2, R> DataStream<R> coMapWith(
CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
DataStream<T2> otherStream) {
return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
coMapper,
new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
Expand All @@ -382,81 +391,82 @@ public <T2, R> DataStream<R> coMapWith(

/**
* Applies a FlatMap transformation on a {@link DataStream}. The
* transformation calls a FlatMapFunction for each element of the
* DataStream. Each FlatMapFunction call can return any number of elements
* including none.
* transformation calls a {@link RichFlatMapFunction} for each element of
* the DataStream. Each RichFlatMapFunction call can return any number of
* elements including none.
*
* @param flatMapper
* The FlatMapFunction that is called for each element of the
* The RichFlatMapFunction that is called for each element of the
* DataStream
*
* @param <R>
* output type
* @return The transformed DataStream.
*/
public <R> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
public <R> StreamOperator<T, R> flatMap(RichFlatMapFunction<T, R> flatMapper) {
return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
RichFlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
}

/**
* Applies a Filter transformation on a {@link DataStream}. The
* transformation calls a {@link FilterFunction} for each element of the
* transformation calls a {@link RichFilterFunction} for each element of the
* DataStream and retains only those element for which the function returns
* true. Elements for which the function returns false are filtered.
*
* @param filter
* The FilterFunction that is called for each element of the
* The RichFilterFunction that is called for each element of the
* DataSet.
* @return The filtered DataStream.
*/
public StreamOperator<T, T> filter(FilterFunction<T> filter) {
public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
}

/**
* Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a {@link GroupReduceFunction} for each tuple batch
* of the predefined size. Each GroupReduceFunction call can return any
* number of elements including none.
* transformation calls a {@link RichGroupReduceFunction} for each tuple
* batch of the predefined size. Each RichGroupReduceFunction call can
* return any number of elements including none.
*
*
* @param reducer
* The GroupReduceFunction that is called for each tuple batch.
* The RichGroupReduceFunction that is called for each tuple
* batch.
* @param batchSize
* The number of tuples grouped together in the batch.
* @param <R>
* output type
* @return The modified DataStream.
*/
public <R> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
public <R> StreamOperator<T, R> batchReduce(RichGroupReduceFunction<T, R> reducer, int batchSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
RichGroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
batchSize));
}

/**
* Applies a reduce transformation on preset "time" chunks of the
* DataStream. The transformation calls a {@link GroupReduceFunction} on
* DataStream. The transformation calls a {@link RichGroupReduceFunction} on
* records received during the predefined time window. The window shifted
* after each reduce call. Each GroupReduceFunction call can return any
* after each reduce call. Each RichGroupReduceFunction call can return any
* number of elements including none.
*
*
* @param reducer
* The GroupReduceFunction that is called for each time window.
* The RichGroupReduceFunction that is called for each time
* window.
* @param windowSize
* The time window to run the reducer on, in milliseconds.
* @param <R>
* output type
* @return The modified DataStream.
*/
public <R> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
public <R> StreamOperator<T, R> windowReduce(RichGroupReduceFunction<T, R> reducer,
long windowSize) {
return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
RichGroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
windowSize));
}

Expand All @@ -477,7 +487,7 @@ public <R> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
* @return the data stream constructed
*/
private <R> StreamOperator<T, R> addFunction(String functionName,
final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
UserTaskInvokable<T, R> functionInvokable) {

DataStream<T> inputStream = new DataStream<T>(this);
Expand All @@ -500,9 +510,9 @@ private <R> StreamOperator<T, R> addFunction(String functionName,
return returnStream;
}

protected <T1, T2, R> DataStream<R> addCoFunction(
String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
DataStream<T1> inputStream1, DataStream<T2> inputStream2,
final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
CoInvokable<T1, T2, R> functionInvokable) {

DataStream<R> returnStream = new DataStream<R>(environment, functionName);
Expand Down Expand Up @@ -535,7 +545,7 @@ protected <T1, T2, R> DataStream<R> addCoFunction(
* @param typeNumber
* Number of the type (used at co-functions)
*/
<X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
String inputID = inputStream.connectIDs.get(i);
StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
Expand All @@ -545,13 +555,13 @@ <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber
}

/**
* Adds the given sink to this environment. Only streams with sinks added
* Adds the given sink to this DataStream. Only streams with sinks added
* will be executed once the {@link StreamExecutionEnvironment#execute()}
* method is called.
*
* @param sinkFunction
* The object containing the sink's invoke function.
* @return The modified DataStream.
* @return The closed DataStream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return addSink(new DataStream<T>(this), sinkFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
Expand Down Expand Up @@ -257,13 +257,13 @@ public int getInputType(int inputNumber) {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}

public void setFunctionClass(Class<? extends AbstractFunction> functionClass) {
public void setFunctionClass(Class<? extends AbstractRichFunction> functionClass) {
config.setClass("functionClass", functionClass);
}

@SuppressWarnings("unchecked")
public Class<? extends AbstractFunction> getFunctionClass() {
return (Class<? extends AbstractFunction>) config.getClass("functionClass", null);
public Class<? extends AbstractRichFunction> getFunctionClass() {
return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ public abstract class StreamExecutionEnvironment {

protected JobGraphBuilder jobGraphBuilder;

/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
}

// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -113,15 +106,6 @@ protected void setDegreeOfParallelism(int degreeOfParallelism) {
this.degreeOfParallelism = degreeOfParallelism;
}

// protected void setMutability(DataStream<?> stream, boolean isMutable) {
// jobGraphBuilder.setMutability(stream.getId(), isMutable);
// }
//
// protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout)
// {
// jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
// }

/**
* Sets the number of hardware contexts (CPU cores / threads) used when
* executed in {@link LocalStreamEnvironment}.
Expand Down Expand Up @@ -186,17 +170,17 @@ public DataStream<String> readTextStream(String filePath, int parallelism) {
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* @param <OUT>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X extends Serializable> DataStream<X> fromElements(X... data) {
DataStream<X> returnStream = new DataStream<X>(this, "elements");
public <OUT extends Serializable> DataStream<OUT> fromElements(OUT... data) {
DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");

try {
SourceFunction<X> function = new FromElementsFunction<X>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<X>(function),
new ObjectTypeWrapper<X, Tuple, X>(data[0], null, data[0]), "source",
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
new ObjectTypeWrapper<OUT, Tuple, OUT>(data[0], null, data[0]), "source",
SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
Expand All @@ -212,25 +196,25 @@ public <X extends Serializable> DataStream<X> fromElements(X... data) {
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* @param <OUT>
* type of the returned stream
* @return The DataStream representing the elements.
*/
@SuppressWarnings("unchecked")
public <X extends Serializable> DataStream<X> fromCollection(Collection<X> data) {
DataStream<X> returnStream = new DataStream<X>(this, "elements");
public <OUT extends Serializable> DataStream<OUT> fromCollection(Collection<OUT> data) {
DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");

if (data.isEmpty()) {
throw new RuntimeException("Collection must not be empty");
}

try {
SourceFunction<X> function = new FromElementsFunction<X>(data);
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);

jobGraphBuilder.addSource(
returnStream.getId(),
new SourceInvokable<X>(new FromElementsFunction<X>(data)),
new ObjectTypeWrapper<X, Tuple, X>((X) data.toArray()[0], null, (X) data
new SourceInvokable<OUT>(new FromElementsFunction<OUT>(data)),
new ObjectTypeWrapper<OUT, Tuple, OUT>((OUT) data.toArray()[0], null, (OUT) data
.toArray()[0]), "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
Expand Down Expand Up @@ -259,16 +243,16 @@ public DataStream<Long> generateSequence(long from, long to) {
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
public <T> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
DataStream<OUT> returnStream = new DataStream<OUT>(this, "source");

try {
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<T>(function),
new FunctionTypeWrapper<T, Tuple, T>(function, SourceFunction.class, 0, -1, 0),
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
new FunctionTypeWrapper<OUT, Tuple, OUT>(function, SourceFunction.class, 0, -1, 0),
"source", SerializationUtils.serialize(function), parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
Expand All @@ -277,7 +261,7 @@ public <T> DataStream<T> addSource(SourceFunction<T> function, int parallelism)
return returnStream;
}

public <T> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> sourceFunction) {
return addSource(sourceFunction, 1);
}

Expand Down
Loading

0 comments on commit 776bd3f

Please sign in to comment.