diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 0176277b6101d..2626d9c702543 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -482,7 +482,7 @@ protected TwoInputStreamOperator getReduceOperator( @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator( - environment, functionName, outTypeInfo, operator); + environment, outTypeInfo, operator); dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(), getType2(), outTypeInfo, functionName); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 09f8155964b3b..6964a07e03ce6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -125,12 +125,10 @@ public class DataStream { * * @param environment * StreamExecutionEnvironment - * @param operatorType - * The type of the operator in the component * @param typeInfo * Type of the datastream */ - public DataStream(StreamExecutionEnvironment environment, String operatorType, TypeInformation typeInfo) { + public DataStream(StreamExecutionEnvironment environment, TypeInformation typeInfo) { if (environment == null) { throw new NullPointerException("context is null"); } @@ -174,7 +172,7 @@ public DataStream(DataStream dataStream) { } /** - * Returns the ID of the {@link DataStream}. + * Returns the ID of the {@link DataStream} in the current {@link StreamExecutionEnvironment}. * * @return ID of the DataStream */ @@ -420,9 +418,11 @@ private DataStream partitionByHash(Keys keys) { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are broadcasted to every parallel instance of the next component. This - * setting only effects the how the outputs will be distributed between the - * parallel instances of the next processing operator. + * are broadcasted to every parallel instance of the next component. + * + *

+ * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * * @return The DataStream with broadcast partitioning set. */ @@ -432,9 +432,11 @@ public DataStream broadcast() { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are shuffled to the next component. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. + * are shuffled uniformly randomly to the next component. + * + *

+ * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * * @return The DataStream with shuffle partitioning set. */ @@ -445,11 +447,13 @@ public DataStream shuffle() { /** * Sets the partitioning of the {@link DataStream} so that the output tuples * are forwarded to the local subtask of the next component (whenever - * possible). This is the default partitioner setting. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. + * possible). + * + *

+ * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * - * @return The DataStream with shuffle partitioning set. + * @return The DataStream with forward partitioning set. */ public DataStream forward() { return setConnectionType(new RebalancePartitioner(true)); @@ -457,11 +461,14 @@ public DataStream forward() { /** * Sets the partitioning of the {@link DataStream} so that the output tuples - * are distributed evenly to the next component.This setting only effects - * the how the outputs will be distributed between the parallel instances of - * the next processing operator. + * are distributed evenly to instances of the next component in a Round-robin + * fashion. + * + *

+ * This setting only effects the how the outputs will be distributed between + * the parallel instances of the next processing operator. * - * @return The DataStream with shuffle partitioning set. + * @return The DataStream with rebalance partitioning set. */ public DataStream rebalance() { return setConnectionType(new RebalancePartitioner(false)); @@ -1237,7 +1244,7 @@ public DataStreamSink write(OutputFormat format, long millis) { /** * Method for passing user defined operators along with the type - * informations that will transform the DataStream. + * information that will transform the DataStream. * * @param operatorName * name of the operator, for logging purposes @@ -1254,7 +1261,7 @@ public DataStreamSink write(OutputFormat format, long millis) { DataStream inputStream = this.copy(); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, - operatorName, outTypeInfo, operator); + outTypeInfo, operator); streamGraph.addOperator(returnStream.getId(), operator, getType(), outTypeInfo, operatorName); @@ -1337,7 +1344,7 @@ public DataStreamSink addSink(SinkFunction sinkFunction) { } /** - * Gets the class of the field at the given position + * Gets the class of the field at the given position. * * @param pos * Position of the field diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index dc457e0d30d3c..60dc367756953 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -31,7 +31,7 @@ public class DataStreamSink extends SingleOutputStreamOperator outTypeInfo, OneInputStreamOperator operator) { - super(environment, operatorType, outTypeInfo, operator); + super(environment, outTypeInfo, operator); } protected DataStreamSink(DataStream dataStream) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index 2bb70df5f59c9..0dd77016c9491 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -34,7 +34,7 @@ public class DataStreamSource extends SingleOutputStreamOperator outTypeInfo, StreamOperator operator, boolean isParallel, String sourceName) { - super(environment, operatorType, outTypeInfo, operator); + super(environment, outTypeInfo, operator); environment.getStreamGraph().addSource(getId(), operator, null, outTypeInfo, sourceName); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 1e5b5cff46597..b4a99c82c5ffb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -64,8 +64,8 @@ public DataStream name(String name){ } protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, - String operatorType, TypeInformation outTypeInfo, StreamOperator operator) { - super(environment, operatorType, outTypeInfo); + TypeInformation outTypeInfo, StreamOperator operator) { + super(environment, outTypeInfo); this.isSplit = false; this.operator = operator; }