Skip to content

Commit

Permalink
[FLINK-2335] [streaming] Lazy iteration construction in StreamGraph
Browse files Browse the repository at this point in the history
Closes apache#900
  • Loading branch information
gyfora committed Jul 11, 2015
1 parent ea4f339 commit 3b69b24
Show file tree
Hide file tree
Showing 14 changed files with 804 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public class DataStream<OUT> {
protected final StreamExecutionEnvironment environment;
protected final Integer id;
protected int parallelism;
protected List<String> userDefinedNames;
protected List<String> selectedNames;
protected StreamPartitioner<OUT> partitioner;
@SuppressWarnings("rawtypes")
protected TypeInformation typeInfo;
protected List<DataStream<OUT>> unionizedStreams;
protected List<DataStream<OUT>> unionedStreams;

protected Integer iterationID = null;
protected Long iterationWaitTime = null;
Expand All @@ -126,11 +126,11 @@ public DataStream(StreamExecutionEnvironment environment, TypeInformation<OUT> t
this.environment = environment;
this.parallelism = environment.getParallelism();
this.streamGraph = environment.getStreamGraph();
this.userDefinedNames = new ArrayList<String>();
this.selectedNames = new ArrayList<String>();
this.partitioner = new RebalancePartitioner<OUT>(true);
this.typeInfo = typeInfo;
this.unionizedStreams = new ArrayList<DataStream<OUT>>();
this.unionizedStreams.add(this);
this.unionedStreams = new ArrayList<DataStream<OUT>>();
this.unionedStreams.add(this);
}

/**
Expand All @@ -143,17 +143,17 @@ public DataStream(DataStream<OUT> dataStream) {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.parallelism = dataStream.parallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.selectedNames = new ArrayList<String>(dataStream.selectedNames);
this.partitioner = dataStream.partitioner.copy();
this.streamGraph = dataStream.streamGraph;
this.typeInfo = dataStream.typeInfo;
this.iterationID = dataStream.iterationID;
this.iterationWaitTime = dataStream.iterationWaitTime;
this.unionizedStreams = new ArrayList<DataStream<OUT>>();
this.unionizedStreams.add(this);
if (dataStream.unionizedStreams.size() > 1) {
for (int i = 1; i < dataStream.unionizedStreams.size(); i++) {
this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i)));
this.unionedStreams = new ArrayList<DataStream<OUT>>();
this.unionedStreams.add(this);
if (dataStream.unionedStreams.size() > 1) {
for (int i = 1; i < dataStream.unionedStreams.size(); i++) {
this.unionedStreams.add(new DataStream<OUT>(dataStream.unionedStreams.get(i)));
}
}

Expand All @@ -176,6 +176,14 @@ public Integer getId() {
public int getParallelism() {
return this.parallelism;
}

public StreamPartitioner<OUT> getPartitioner() {
return this.partitioner;
}

public List<String> getSelectedNames(){
return selectedNames;
}

/**
* Gets the type of the stream.
Expand Down Expand Up @@ -248,9 +256,9 @@ public DataStream<OUT> union(DataStream<OUT>... streams) {
DataStream<OUT> returnStream = this.copy();

for (DataStream<OUT> stream : streams) {
for (DataStream<OUT> ds : stream.unionizedStreams) {
for (DataStream<OUT> ds : stream.unionedStreams) {
validateUnion(ds.getId());
returnStream.unionizedStreams.add(ds.copy());
returnStream.unionedStreams.add(ds.copy());
}
}
return returnStream;
Expand All @@ -268,7 +276,7 @@ public DataStream<OUT> union(DataStream<OUT>... streams) {
* @return The {@link SplitDataStream}
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
for (DataStream<OUT> ds : this.unionizedStreams) {
for (DataStream<OUT> ds : this.unionedStreams) {
streamGraph.addOutputSelector(ds.getId(), clean(outputSelector));
}

Expand Down Expand Up @@ -1103,9 +1111,7 @@ public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
}

protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<?> feedbackType) {
Integer id = ++counter;
streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
streamGraph.setParallelism(id, dataStream.getParallelism());
streamGraph.addIterationHead(dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
}

/**
Expand All @@ -1118,7 +1124,7 @@ protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<
protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
DataStream<OUT> returnStream = this.copy();

for (DataStream<OUT> stream : returnStream.unionizedStreams) {
for (DataStream<OUT> stream : returnStream.unionedStreams) {
stream.partitioner = partitioner;
}

Expand All @@ -1139,9 +1145,9 @@ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner)
* Number of the type (used at co-functions)
*/
protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) {
for (DataStream<X> stream : inputStream.unionizedStreams) {
for (DataStream<X> stream : inputStream.unionedStreams) {
streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
inputStream.userDefinedNames);
inputStream.selectedNames);
}

}
Expand Down Expand Up @@ -1170,7 +1176,7 @@ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
}

private void validateUnion(Integer id) {
for (DataStream<OUT> ds : this.unionizedStreams) {
for (DataStream<OUT> ds : this.unionedStreams) {
if (ds.getId().equals(id)) {
throw new RuntimeException("A DataStream cannot be merged with itself");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.streaming.api.datastream;

import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand All @@ -32,6 +34,8 @@
*/
public class IterativeDataStream<IN> extends
SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {

protected boolean closed = false;

static Integer iterationCount = 0;

Expand Down Expand Up @@ -60,20 +64,18 @@ protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
* @return The feedback stream.
*
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
null);

// We add an iteration sink to the tail which will send tuples to the
// iteration head
streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
iterationWaitTime);

if (keepPartitioning) {
connectGraph(iterationTail, iterationSink.getId(), 0);
} else {
connectGraph(iterationTail.forward(), iterationSink.getId(), 0);

if (closed) {
throw new IllegalStateException(
"An iterative data stream can only be closed once. Use union to close with multiple stream.");
}
closed = true;

streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID,
keepPartitioning);

return iterationTail;
}

Expand Down Expand Up @@ -138,7 +140,8 @@ public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(Class<F> feedbac
* @return A {@link ConnectedIterativeDataStream}.
*/
public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) {
return new ConnectedIterativeDataStream<IN, F>(this, feedbackType);
return new ConnectedIterativeDataStream<IN, F>(new IterativeDataStream<IN>(this,
iterationWaitTime), feedbackType);
}

/**
Expand Down Expand Up @@ -201,14 +204,16 @@ public TypeInformation<F> getType2() {
* @return The feedback stream.
*
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public DataStream<F> closeWith(DataStream<F> feedbackStream) {
DataStream<F> iterationSink = new DataStreamSink<F>(input.environment, "Iteration Sink",
null, null);
if (input.closed) {
throw new IllegalStateException(
"An iterative data stream can only be closed once. Use union to close with multiple stream.");
}
input.closed = true;

input.streamGraph.addIterationTail(iterationSink.getId(), feedbackStream.getId(), input.iterationID,
input.iterationWaitTime);

input.connectGraph(feedbackStream, iterationSink.getId(), 0);
input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams,
input.iterationID, true);
return feedbackStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ private DataStream<OUT> selectOutput(String[] outputNames) {

DataStream<OUT> returnStream = copy();

for (DataStream<OUT> ds : returnStream.unionizedStreams) {
ds.userDefinedNames = Arrays.asList(outputNames);
for (DataStream<OUT> ds : returnStream.unionedStreams) {
ds.selectedNames = Arrays.asList(outputNames);
}
return returnStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
}
}

public void setIterationId(Integer iterationId) {
config.setInteger(ITERATION_ID, iterationId);
public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
}

public Integer getIterationId() {
return config.getInteger(ITERATION_ID, 0);
public String getIterationId() {
return config.getString(ITERATION_ID, "");
}

public void setIterationWaitTime(long time) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class StreamEdge implements Serializable {
* output selection).
*/
final private List<String> selectedNames;
final private StreamPartitioner<?> outputPartitioner;
private StreamPartitioner<?> outputPartitioner;

public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
Expand Down Expand Up @@ -87,6 +87,10 @@ public List<String> getSelectedNames() {
public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}

public void setPartitioner(StreamPartitioner<?> partitioner) {
this.outputPartitioner = partitioner;
}

@Override
public int hashCode() {
Expand Down
Loading

0 comments on commit 3b69b24

Please sign in to comment.