Skip to content

Commit

Permalink
[SPARK-27395][SQL] Improve EXPLAIN command
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)apache#15]
+- *(2) Filter (isnotnull(max(val#3)apache#18) AND (max(val#3)apache#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)apache#15, max(val#3)apache#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)apache#5, max(val#3)apache#8]
Condition : (isnotnull(max(val#3)apache#8) AND (max(val#3)apache#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)apache#5]
Input     : [key#2, max(val)apache#5, max(val#3)apache#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
dilipbiswal authored and cloud-fan committed Aug 26, 2019
1 parent c353a84 commit c61270f
Show file tree
Hide file tree
Showing 26 changed files with 1,318 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ abstract class Expression extends TreeNode[Expression] {
val childrenSQL = children.map(_.sql).mkString(", ")
s"$prettyName($childrenSQL)"
}

override def simpleStringWithNodeId(): String = {
throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ trait Block extends TreeNode[Block] with JavaCode {
}

override def verboseString(maxFields: Int): String = toString
override def simpleStringWithNodeId(): String = {
throw new UnsupportedOperationException(s"$nodeName does not implement simpleStringWithNodeId")
}
}

object Block {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -179,6 +180,20 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT

override def verboseString(maxFields: Int): String = simpleString(maxFields)

override def simpleStringWithNodeId(): String = {
val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown")
s"$nodeName ($operatorId)".trim
}

def verboseStringWithOperatorId(): String = {
val codegenIdStr =
getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("")
val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown")
s"""
|($operatorId) $nodeName $codegenIdStr
""".stripMargin
}

/**
* All the subqueries of current plan.
*/
Expand All @@ -204,7 +219,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
subqueries ++ subqueries.flatMap(_.subqueriesAll)
}

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
override def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* A private mutable variable to indicate whether this plan is the result of canonicalization.
Expand Down Expand Up @@ -289,6 +304,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}

object QueryPlan extends PredicateHelper {
val OP_ID_TAG = TreeNodeTag[Int]("operatorId")
val CODEGEN_ID_TAG = new TreeNodeTag[Int]("wholeStageCodegenId")

/**
* Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
* with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
Expand Down Expand Up @@ -335,9 +353,10 @@ object QueryPlan extends PredicateHelper {
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): Unit = {
try {
plan.treeString(append, verbose, addSuffix, maxFields)
plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId)
} catch {
case e: AnalysisException => append(e.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -530,9 +531,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* @param maxFields Maximum number of fields that will be converted to strings.
* Any elements beyond the limit will be dropped.
*/
def simpleString(maxFields: Int): String = {
s"$nodeName ${argString(maxFields)}".trim
}
def simpleString(maxFields: Int): String = s"$nodeName ${argString(maxFields)}".trim

/**
* ONE line description of this node containing the node identifier.
* @return
*/
def simpleStringWithNodeId(): String

/** ONE line description of this node with more information */
def verboseString(maxFields: Int): String
Expand All @@ -548,19 +553,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
final def treeString(
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): String = {
val concat = new PlanStringConcat()

treeString(concat.append, verbose, addSuffix, maxFields)
treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId)
concat.toString
}

def treeString(
append: String => Unit,
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int): Unit = {
generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
maxFields: Int,
printOperatorId: Boolean): Unit = {
generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields, printOperatorId)
}

/**
Expand Down Expand Up @@ -609,7 +615,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* All the nodes that should be shown as a inner nested tree of this node.
* For example, this can be used to show sub-queries.
*/
protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
def innerChildren: Seq[TreeNode[_]] = Seq.empty

/**
* Appends the string representation of this node and its children to the given Writer.
Expand All @@ -627,7 +633,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
maxFields: Int,
printNodeId: Boolean): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -639,7 +646,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else {
simpleString(maxFields)
if (printNodeId) {
simpleStringWithNodeId()
} else {
simpleString(maxFields)
}
}
append(prefix)
append(str)
Expand All @@ -648,17 +659,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
addSuffix = addSuffix, maxFields = maxFields))
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
addSuffix = addSuffix, maxFields = maxFields)
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields))
depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix,
maxFields, printNodeId = printNodeId)
)
children.last.generateTreeString(
depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields)
depth + 1, lastChildren :+ true, append, verbose, prefix,
addSuffix, maxFields, printNodeId = printNodeId)
}
}

Expand Down
Loading

0 comments on commit c61270f

Please sign in to comment.