Skip to content

Commit

Permalink
[FLINK-7] Add range partitioning with automatic sampling of key distr…
Browse files Browse the repository at this point in the history
…ibution

This closes apache#1255
  • Loading branch information
chengxiang li authored and fhueske committed Dec 21, 2015
1 parent aec6ded commit f5957ce
Show file tree
Hide file tree
Showing 25 changed files with 1,358 additions and 56 deletions.
37 changes: 36 additions & 1 deletion docs/apis/dataset_transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ Not supported.
### Hash-Partition

Hash-partitions a DataSet on a given key.
Keys can be specified as key expressions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).
Keys can be specified as position keys, expression keys, and key selector functions (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -1981,6 +1981,41 @@ Not supported.
</div>
</div>

### Range-Partition

Range-partitions a DataSet on a given key.
Keys can be specified as position keys, expression keys, and key selector functions (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

~~~java
DataSet<Tuple2<String, Integer>> in = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
.mapPartition(new PartitionMapper());
~~~

</div>
<div data-lang="scala" markdown="1">

~~~scala
val in: DataSet[(String, Int)] = // [...]
// range-partition DataSet by String value and apply a MapPartition transformation.
val out = in.partitionByRange(0).mapPartition { ... }
~~~

</div>
<div data-lang="python" markdown="1">

~~~python
Not supported.
~~~

</div>
</div>


### Sort Partition

Locally sorts all partitions of a DataSet on a specified field in a specified order.
Expand Down
26 changes: 23 additions & 3 deletions docs/apis/programming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,22 @@ DataSet<String> result = in.rebalance()
<tr>
<td><strong>Hash-Partition</strong></td>
<td>
<p>Hash-partitions a data set on a given key. Keys can be specified as key-selector functions or field position keys.</p>
<p>Hash-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions.</p>
{% highlight java %}
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByHash(0)
.mapPartition(new PartitionMapper());
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Range-Partition</strong></td>
<td>
<p>Range-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions.</p>
{% highlight java %}
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionByRange(0)
.mapPartition(new PartitionMapper());
{% endhighlight %}
</td>
</tr>
Expand Down Expand Up @@ -1012,11 +1023,20 @@ val result: DataSet[(Int, String)] = data1.rebalance().map(...)
<tr>
<td><strong>Hash-Partition</strong></td>
<td>
<p>Hash-partitions a data set on a given key. Keys can be specified as key-selector functions, tuple positions
or case class fields.</p>
<p>Hash-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions.</p>
{% highlight scala %}
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByHash(0).mapPartition { ... }
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Range-Partition</strong></td>
<td>
<p>Range-partitions a data set on a given key. Keys can be specified as position keys, expression keys, and key selector functions.</p>
{% highlight scala %}
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
{% endhighlight %}
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;

import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.junit.Assert.fail;

@SuppressWarnings("serial")
public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
Expand Down Expand Up @@ -72,4 +73,32 @@ public void testCreatePlanAfterGetExecutionPlan() {
fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
}
}

@Test
public void testGetExecutionPlanOfRangePartition() {
ExecutionEnvironment env = new LocalEnvironment();
env.getConfig().disableSysoutLogging();

DataSet<Integer> baseSet = env.fromElements(1, 2);

DataSet<Tuple2<Integer, Integer>> result = baseSet
.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
return new Tuple2(value, value * 2);
}
})
.partitionByRange(0)
.aggregate(Aggregations.MAX, 1);
result.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());

try {
env.getExecutionPlan();
env.execute();
}
catch (Exception e) {
e.printStackTrace();
fail("Cannot run both #getExecutionPlan and #execute.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.distributions;

import org.apache.flink.api.common.typeutils.TypeComparator;

public class CommonRangeBoundaries<T> implements RangeBoundaries<T> {
private final TypeComparator<T> typeComparator;
private final Object[][] boundaries;
private final TypeComparator[] flatComparators;
private final Object[] keys;

public CommonRangeBoundaries(TypeComparator<T> typeComparators, Object[][] boundaries) {
this.typeComparator = typeComparators;
this.flatComparators = typeComparators.getFlatComparators();
this.keys = new Object[flatComparators.length];
this.boundaries = boundaries;
}

@Override
public int getRangeIndex(T record) {
return binarySearch(record);
}

// Search the range index of input record.
private int binarySearch(T record) {
int low = 0;
int high = this.boundaries.length - 1;
typeComparator.extractKeys(record, keys, 0);

while (low <= high) {
final int mid = (low + high) >>> 1;
final int result = compareKeys(flatComparators, keys, this.boundaries[mid]);

if (result > 0) {
low = mid + 1;
} else if (result < 0) {
high = mid - 1;
} else {
return mid;
}
}
// key not found, but the low index is the target
// bucket, since the boundaries are the upper bound
return low;
}

private int compareKeys(TypeComparator[] flatComparators, Object[] keys, Object[] boundary) {
if (flatComparators.length != keys.length || flatComparators.length != boundary.length) {
throw new RuntimeException("Can not compare keys with boundary due to mismatched length.");
}

for (int i=0; i<flatComparators.length; i++) {
int result = flatComparators[i].compare(keys[i], boundary[i]);
if (result != 0) {
return result;
}
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.Serializable;

import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.types.Key;

public interface DataDistribution extends IOReadableWritable, Serializable {

Expand All @@ -46,7 +45,7 @@ public interface DataDistribution extends IOReadableWritable, Serializable {
*
* @return A record whose values act as bucket boundaries for the specified bucket.
*/
Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets);
Object[] getBucketBoundary(int bucketNum, int totalNumBuckets);

/**
* The number of fields in the (composite) key. This determines how many fields in the records define
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.distributions;

import java.io.Serializable;

/**
* RangeBoundaries is used to split the records into multiple ranges.
*
* @param <T> The boundary type.
*/
public interface RangeBoundaries<T> extends Serializable {

/**
* Get the range index of record.
*
* @param record The input record.
* @return The range index.
*/
int getRangeIndex(T record);
}
55 changes: 49 additions & 6 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,49 @@ public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelecto
final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName());
}


/**
* Range-partitions a DataSet on the specified key fields.
* <p>
* <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
* shuffles the whole DataSet over the network. This can take significant amount of time.
*
* @param fields The field indexes on which the DataSet is range-partitioned.
* @return The partitioned DataSet.
*/
public PartitionOperator<T> partitionByRange(int... fields) {
return new PartitionOperator<T>(this, PartitionMethod.RANGE, new Keys.ExpressionKeys<T>(fields, getType(), false), Utils.getCallLocationName());
}

/**
* Range-partitions a DataSet on the specified key fields.
* <p>
* <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
* shuffles the whole DataSet over the network. This can take significant amount of time.
*
* @param fields The field expressions on which the DataSet is range-partitioned.
* @return The partitioned DataSet.
*/
public PartitionOperator<T> partitionByRange(String... fields) {
return new PartitionOperator<T>(this, PartitionMethod.RANGE, new Keys.ExpressionKeys<T>(fields, getType()), Utils.getCallLocationName());
}

/**
* Range-partitions a DataSet using the specified KeySelector.
* <p>
* <b>Important:</b>This operation requires an extra pass over the DataSet to compute the range boundaries and
* shuffles the whole DataSet over the network. This can take significant amount of time.
*
* @param keyExtractor The KeyExtractor with which the DataSet is range-partitioned.
* @return The partitioned DataSet.
*
* @see KeySelector
*/
public <K extends Comparable<K>> PartitionOperator<T> partitionByRange(KeySelector<T, K> keyExtractor) {
final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
return new PartitionOperator<T>(this, PartitionMethod.RANGE, new Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName());
}

/**
* Partitions a tuple DataSet on the specified key fields using a custom partitioner.
* This method takes the key position to partition on, and a partitioner that accepts the key type.
Expand Down Expand Up @@ -1393,7 +1435,7 @@ public SortPartitionOperator<T> sortPartition(String field, Order order) {
public DataSink<T> writeAsText(String filePath) {
return output(new TextOutputFormat<T>(new Path(filePath)));
}

/**
* Writes a DataSet as text file(s) to the specified location.<br>
* For each element of the DataSet the result of {@link Object#toString()} is written.
Expand All @@ -1410,7 +1452,7 @@ public DataSink<T> writeAsText(String filePath, WriteMode writeMode) {
tof.setWriteMode(writeMode);
return output(tof);
}

/**
* Writes a DataSet as text file(s) to the specified location.<br>
* For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
Expand Down Expand Up @@ -1441,7 +1483,7 @@ public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> f
public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, TextFormatter<T> formatter) {
return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
}

/**
* Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.<br>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
Expand All @@ -1459,7 +1501,7 @@ public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMod
public DataSink<T> writeAsCsv(String filePath) {
return writeAsCsv(filePath, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
}

/**
* Writes a {@link Tuple} DataSet as CSV file(s) to the specified location.<br>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
Expand All @@ -1478,7 +1520,7 @@ public DataSink<T> writeAsCsv(String filePath) {
public DataSink<T> writeAsCsv(String filePath, WriteMode writeMode) {
return internalWriteAsCsv(new Path(filePath),CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode);
}

/**
* Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.<br>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br>
Expand Down Expand Up @@ -1688,4 +1730,5 @@ protected static void checkSameExecutionContext(DataSet<?> set1, DataSet<?> set2
}
}


}
Loading

0 comments on commit f5957ce

Please sign in to comment.