Skip to content

Commit

Permalink
Merge pull request twitter#1278 from twitter/jnievelt/1277
Browse files Browse the repository at this point in the history
Hydrate both sides of sampledCounts in skewJoinWithSmaller
  • Loading branch information
johnynek committed May 8, 2015
2 parents 7c006c1 + a5a71c0 commit ebb4968
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ trait JoinAlgorithms {
(otherPipe, fs._2, Fields.NONE)
else // For now, we are assuming an inner join.
renameCollidingFields(otherPipe, fs._2, intersection)
val mergedJoinKeys = Fields.join(fs._1, rightResolvedJoinFields)

// 1. First, get an approximate count of the left join keys and the right join keys, so that we
// know how much to replicate.
Expand All @@ -392,7 +393,29 @@ trait JoinAlgorithms {
val sampledRight = rightPipe.sample(sampleRate, Seed)
.groupBy(rightResolvedJoinFields) { _.size(rightSampledCountField) }
val sampledCounts = sampledLeft.joinWithSmaller(fs._1 -> rightResolvedJoinFields, sampledRight, joiner = new OuterJoin)
.project(Fields.join(fs._1, rightResolvedJoinFields, sampledCountFields))
.project(Fields.join(mergedJoinKeys, sampledCountFields))
.map(mergedJoinKeys -> mergedJoinKeys) { t: cascading.tuple.Tuple =>
// Make the outer join look like an inner join so that we can join
// either the left or right fields for every entry.
// Accomplished by replacing any null field with the corresponding
// field from the other half. E.g.,
// (1, 2, "foo", null, null, null) -> (1, 2, "foo", 1, 2, "foo")
val keysSize = t.size / 2
val result = new cascading.tuple.Tuple(t)

for (index <- 0 until keysSize) {
val leftValue = result.getObject(index)
val rightValue = result.getObject(index + keysSize)

if (leftValue == null) {
result.set(index, rightValue)
} else if (rightValue == null) {
result.set(index + keysSize, leftValue)
}
}

result
}

// 2. Now replicate each group of join keys in the left and right pipes, according to the sampled counts
// from the previous step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ object JoinTestHelper {
.arg("replicationFactor", replicationFactor.toString)
.arg("replicator", replicator.toString)
.source(Tsv("input0"), generateInput(1000, 100))
.source(Tsv("input1"), generateInput(1000, 100))
.sink[(Int, Int, Int, Int, Int, Int)](Tsv("output")) { outBuf => skewResult ++ outBuf }
.sink[(Int, Int, Int, Int, Int, Int)](Tsv("jws-output")) { outBuf => innerResult ++ outBuf }
.source(Tsv("input1"), generateInput(100, 100))
.sink[(Int, Int, Int, Int, Int, Int)](Tsv("output")) { outBuf => skewResult ++= outBuf }
.sink[(Int, Int, Int, Int, Int, Int)](Tsv("jws-output")) { outBuf => innerResult ++= outBuf }
.run
//.runHadoop //this takes MUCH longer to run. Commented out by default, but tests pass on my machine
.finish
Expand Down

0 comments on commit ebb4968

Please sign in to comment.