Skip to content

Commit

Permalink
[FLINK-2103] [streaming] [api-extending] Expose partitionBy to user
Browse files Browse the repository at this point in the history
Conflicts:
	flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java

Closes apache#743
  • Loading branch information
aljoscha authored and mbalassi committed Jun 3, 2015
1 parent bf9cc81 commit a43e0d5
Showing 1 changed file with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,57 @@ private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) {
getType(), getExecutionConfig())));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned hashing on the given fields. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* @param fields The tuple fields that should be used for partitioning
* @return The partitioned DataStream
* Specifies how elements will be distributed to parallel instances of downstream operations.
*
*/
public DataStream<OUT> partitionBy(int... fields) {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned hashing on the given fields. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* @param fields The tuple fields that should be used for partitioning
* @return The partitioned DataStream
* Specifies how elements will be distributed to parallel instances of downstream operations.
*
*/
public DataStream<OUT> partitionBy(String... fields) {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned using the given {@link KeySelector}. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
*
* @param keySelector
* @return The partitioned DataStream
* Specifies how elements will be distributed to parallel instances of downstream operations.
*/
protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
}

//private helper method for partitioning
private DataStream<OUT> partitionBy(Keys<OUT> keys) {
return setConnectionType(
new FieldsPartitioner<OUT>(
clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))));
}

/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component. This
Expand Down

0 comments on commit a43e0d5

Please sign in to comment.