Skip to content

Commit

Permalink
[FLINK-2107] Add hash-based strategies for left and right outer joins.
Browse files Browse the repository at this point in the history
This closes apache#1262
  • Loading branch information
fhueske committed Oct 19, 2015
1 parent c900577 commit 5671c77
Show file tree
Hide file tree
Showing 40 changed files with 2,246 additions and 409 deletions.
58 changes: 48 additions & 10 deletions docs/apis/dataset_transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -1408,25 +1408,25 @@ Not supported.

The following hints are available:

* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.

* BROADCAST_HASH_FIRST: Broadcasts the first input and builds a hash table from it, which is
* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
probed by the second input. A good strategy if the first input is very small.

* BROADCAST_HASH_SECOND: Broadcasts the second input and builds a hash table from it, which is
* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
probed by the first input. A good strategy if the second input is very small.

* REPARTITION_HASH_FIRST: The system partitions (shuffles) each input (unless the input is already
* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the first input. This strategy is good if the first
input is smaller than the second, but both inputs are still large.
*Note:* This is the default fallback strategy that the system uses if no size estimates can be made
and no pre-existing partitiongs and sort-orders can be re-used.

* REPARTITION_HASH_SECOND: The system partitions (shuffles) each input (unless the input is already
* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the second input. This strategy is good if the second
input is smaller than the first, but both inputs are still large.

* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
already sorted.
Expand Down Expand Up @@ -1558,9 +1558,13 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
DataSet<Tuple2<SomeType, AnotherType> result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
DataSet<Tuple2<SomeType, AnotherType> result2 =
input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
~~~
</div>
Expand All @@ -1573,6 +1577,8 @@ val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")

val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")

~~~

</div>
Expand All @@ -1585,15 +1591,47 @@ Not supported.
</div>
</div>

**NOTE:** Right now, outer joins can only be executed using the `REPARTITION_SORT_MERGE` strategy. Further execution strategies will be added in the future.
The following hints are available.

* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.

* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
probed by the second input. A good strategy if the first input is very small.

* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
probed by the first input. A good strategy if the second input is very small.

* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the first input. This strategy is good if the first
input is smaller than the second, but both inputs are still large.

* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the second input. This strategy is good if the second
input is smaller than the first, but both inputs are still large.

* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
already sorted.

**NOTE:** Not all execution strategies are supported by every outer join type, yet.

* `LeftOuterJoin` supports:
* `OPTIMIZER_CHOOSES`
* `BROADCAST_HASH_SECOND`
* `REPARTITION_HASH_SECOND`
* `REPARTITION_SORT_MERGE`

* `RightOuterJoin` supports:
* `OPTIMIZER_CHOOSES`
* `BROADCAST_HASH_FIRST`
* `REPARTITION_HASH_FIRST`
* `REPARTITION_SORT_MERGE`

* `FullOuterJoin` supports:
* `OPTIMIZER_CHOOSES`
* `REPARTITION_SORT_MERGE`


### Cross

Expand Down
29 changes: 26 additions & 3 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,16 @@ public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other) {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: "+strategy);
}

}

/**
Expand Down Expand Up @@ -881,7 +890,15 @@ public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other) {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case BROADCAST_HASH_FIRST:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: "+strategy);
}
}

/**
Expand Down Expand Up @@ -921,7 +938,13 @@ public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other) {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
}
}


Expand Down
Loading

0 comments on commit 5671c77

Please sign in to comment.