Skip to content

Commit

Permalink
[streaming] [api-breaking] Minor DataStream cleanups
Browse files Browse the repository at this point in the history
- Removed unused constructor parameter.
- Updated outdated and wrong connection javadocs.

Closes apache#825
  • Loading branch information
mbalassi committed Jun 12, 2015
1 parent 9d6290e commit 32ddc9e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ protected <OUT> TwoInputStreamOperator<IN1, IN2, OUT> getReduceOperator(

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName, outTypeInfo, operator);
environment, outTypeInfo, operator);

dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(),
getType2(), outTypeInfo, functionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,10 @@ public class DataStream<OUT> {
*
* @param environment
* StreamExecutionEnvironment
* @param operatorType
* The type of the operator in the component
* @param typeInfo
* Type of the datastream
*/
public DataStream(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> typeInfo) {
public DataStream(StreamExecutionEnvironment environment, TypeInformation<OUT> typeInfo) {
if (environment == null) {
throw new NullPointerException("context is null");
}
Expand Down Expand Up @@ -174,7 +172,7 @@ public DataStream(DataStream<OUT> dataStream) {
}

/**
* Returns the ID of the {@link DataStream}.
* Returns the ID of the {@link DataStream} in the current {@link StreamExecutionEnvironment}.
*
* @return ID of the DataStream
*/
Expand Down Expand Up @@ -420,9 +418,11 @@ private DataStream<OUT> partitionByHash(Keys<OUT> keys) {

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component. This
* setting only effects the how the outputs will be distributed between the
* parallel instances of the next processing operator.
* are broadcasted to every parallel instance of the next component.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
*
* @return The DataStream with broadcast partitioning set.
*/
Expand All @@ -432,9 +432,11 @@ public DataStream<OUT> broadcast() {

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are shuffled to the next component. This setting only effects the how the
* outputs will be distributed between the parallel instances of the next
* processing operator.
* are shuffled uniformly randomly to the next component.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
*
* @return The DataStream with shuffle partitioning set.
*/
Expand All @@ -445,23 +447,28 @@ public DataStream<OUT> shuffle() {
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are forwarded to the local subtask of the next component (whenever
* possible). This is the default partitioner setting. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
* possible).
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
*
* @return The DataStream with shuffle partitioning set.
* @return The DataStream with forward partitioning set.
*/
public DataStream<OUT> forward() {
return setConnectionType(new RebalancePartitioner<OUT>(true));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are distributed evenly to the next component.This setting only effects
* the how the outputs will be distributed between the parallel instances of
* the next processing operator.
* are distributed evenly to instances of the next component in a Round-robin
* fashion.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
*
* @return The DataStream with shuffle partitioning set.
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<OUT> rebalance() {
return setConnectionType(new RebalancePartitioner<OUT>(false));
Expand Down Expand Up @@ -1237,7 +1244,7 @@ public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {

/**
* Method for passing user defined operators along with the type
* informations that will transform the DataStream.
* information that will transform the DataStream.
*
* @param operatorName
* name of the operator, for logging purposes
Expand All @@ -1254,7 +1261,7 @@ public DataStreamSink<OUT> write(OutputFormat<OUT> format, long millis) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
operatorName, outTypeInfo, operator);
outTypeInfo, operator);

streamGraph.addOperator(returnStream.getId(), operator, getType(), outTypeInfo,
operatorName);
Expand Down Expand Up @@ -1337,7 +1344,7 @@ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
}

/**
* Gets the class of the field at the given position
* Gets the class of the field at the given position.
*
* @param pos
* Position of the field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStrea

protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType,
TypeInformation<IN> outTypeInfo, OneInputStreamOperator<IN, ?> operator) {
super(environment, operatorType, outTypeInfo, operator);
super(environment, outTypeInfo, operator);
}

protected DataStreamSink(DataStream<IN> dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
public DataStreamSource(StreamExecutionEnvironment environment, String operatorType,
TypeInformation<OUT> outTypeInfo, StreamOperator<OUT> operator,
boolean isParallel, String sourceName) {
super(environment, operatorType, outTypeInfo, operator);
super(environment, outTypeInfo, operator);

environment.getStreamGraph().addSource(getId(), operator, null, outTypeInfo,
sourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public DataStream<OUT> name(String name){
}

protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
String operatorType, TypeInformation<OUT> outTypeInfo, StreamOperator<?> operator) {
super(environment, operatorType, outTypeInfo);
TypeInformation<OUT> outTypeInfo, StreamOperator<?> operator) {
super(environment, outTypeInfo);
this.isSplit = false;
this.operator = operator;
}
Expand Down

0 comments on commit 32ddc9e

Please sign in to comment.