Skip to content

Commit

Permalink
[SPARK-32753][SQL] Only copy tags to node with no tags
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.

### Why are the changes needed?
cloud-fan [made a good point](apache#29593 (comment)) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)
```

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

### Does this PR introduce _any_ user-facing change?
Yes. Fix a bug.

### How was this patch tested?
Add test.

Closes apache#29593 from manuzhang/spark-32753.

Authored-by: manuzhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
manuzhang authored and cloud-fan committed Sep 7, 2020
1 parent 04f7f6d commit c43460c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty

protected def copyTagsFrom(other: BaseType): Unit = {
tags ++= other.tags
// SPARK-32753: it only makes sense to copy tags to a new node
// but it's too expensive to detect other cases likes node removal
// so we make a compromise here to copy tags to node with no tags
if (tags.isEmpty) {
tags ++= other.tags
}
}

def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,4 +1224,18 @@ class AdaptiveQueryExecSuite
})
}
}

test("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
) {
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")

val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
assert(collect(adaptivePlan) {
case s: ShuffleExchangeExec => s
}.length == 1)
}
}
}

0 comments on commit c43460c

Please sign in to comment.