Skip to content

Commit

Permalink
[SPARK-20567] Lazily bind in GenerateExec
Browse files Browse the repository at this point in the history
It is not valid to eagerly bind with the child's output as this causes failures when we attempt to canonicalize the plan (replacing the attribute references with dummies).

Author: Michael Armbrust <[email protected]>

Closes apache#17838 from marmbrus/fixBindExplode.
  • Loading branch information
marmbrus authored and hvanhovell committed May 3, 2017
1 parent b946f31 commit 6235132
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class GenerateExec(

override def outputPartitioning: Partitioning = child.outputPartitioning

val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
lazy val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)

protected override def doExecute(): RDD[InternalRow] = {
// boundGenerator.terminate() should be triggered after all of the rows in the partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
)
}

test("count distinct") {
val inputData = MemoryStream[(Int, Seq[Int])]

val aggregated =
inputData.toDF()
.select($"*", explode($"_2") as 'value)
.groupBy($"_1")
.agg(size(collect_set($"value")))
.as[(Int, Int)]

testStream(aggregated, Update)(
AddData(inputData, (1, Seq(1, 2))),
CheckLastBatch((1, 2))
)
}

test("simple count, complete mode") {
val inputData = MemoryStream[Int]

Expand Down

0 comments on commit 6235132

Please sign in to comment.