Skip to content

Commit

Permalink
[SPARK-20213][SQL] Fix DataFrameWriter operations in SQL UI tab
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently the `DataFrameWriter` operations have several problems:

1. non-file-format data source writing action doesn't show up in the SQL tab in Spark UI
2. file-format data source writing action shows a scan node in the SQL tab, without saying anything about writing. (streaming also have this issue, but not fixed in this PR)
3. Spark SQL CLI actions don't show up in the SQL tab.

This PR fixes all of them, by refactoring the `ExecuteCommandExec` to make it have children.

 close apache#17540

## How was this patch tested?

existing tests.

Also test the UI manually. For a simple command: `Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe")`

before this PR:
<img width="266" alt="qq20170523-035840 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326050/24e18ba2-3f6c-11e7-8817-6dd275bf6ac5.png">
after this PR:
<img width="287" alt="qq20170523-035708 2x" src="https://cloud.githubusercontent.com/assets/3182036/26326054/2ad7f460-3f6c-11e7-8053-d68325beb28f.png">

Author: Wenchen Fan <[email protected]>

Closes apache#18064 from cloud-fan/execution.
  • Loading branch information
cloud-fan committed May 31, 2017
1 parent fa757ee commit 10e526e
Show file tree
Hide file tree
Showing 37 changed files with 299 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ private[kafka010] object KafkaWriter extends Logging {
topic: Option[String] = None): Unit = {
val schema = queryExecution.analyzed.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
})
}

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
override def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
trait Command extends LeafNode {
trait Command extends LogicalPlan {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.math.{MathContext, RoundingMode}
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -243,9 +244,9 @@ object ColumnStat extends Logging {
}

col.dataType match {
case _: IntegralType => fixedLenTypeStruct(LongType)
case dt: IntegralType => fixedLenTypeStruct(dt)
case _: DecimalType => fixedLenTypeStruct(col.dataType)
case DoubleType | FloatType => fixedLenTypeStruct(DoubleType)
case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
case BooleanType => fixedLenTypeStruct(col.dataType)
case DateType => fixedLenTypeStruct(col.dataType)
case TimestampType => fixedLenTypeStruct(col.dataType)
Expand All @@ -264,14 +265,12 @@ object ColumnStat extends Logging {
}

/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
// for string/binary min/max, get should return null
min = Option(row.get(1))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
max = Option(row.get(2))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
min = Option(row.get(1, attr.dataType)),
max = Option(row.get(2, attr.dataType)),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -231,12 +232,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

runCommand(df.sparkSession, "save") {
SaveIntoDataSourceCommand(
query = df.logicalPlan,
provider = source,
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap,
mode = mode)
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
}
}

Expand Down Expand Up @@ -607,7 +607,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
qe.toRdd
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
Expand Down
48 changes: 40 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ class Dataset[T] private[sql](
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
Expand Down Expand Up @@ -248,8 +248,13 @@ class Dataset[T] private[sql](
_numRows: Int, truncate: Int = 20, vertical: Boolean = false): String = {
val numRows = _numRows.max(0)
val takeResult = toDF().take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
showString(takeResult, numRows, truncate, vertical)
}

private def showString(
dataWithOneMoreRow: Array[Row], numRows: Int, truncate: Int, vertical: Boolean): String = {
val hasMoreData = dataWithOneMoreRow.length > numRows
val data = dataWithOneMoreRow.take(numRows)

lazy val timeZone =
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
Expand Down Expand Up @@ -684,6 +689,18 @@ class Dataset[T] private[sql](
} else {
println(showString(numRows, truncate = 0))
}

// An internal version of `show`, which won't set execution id and trigger listeners.
private[sql] def showInternal(_numRows: Int, truncate: Boolean): Unit = {
val numRows = _numRows.max(0)
val takeResult = toDF().takeInternal(numRows + 1)

if (truncate) {
println(showString(takeResult, numRows, truncate = 20, vertical = false))
} else {
println(showString(takeResult, numRows, truncate = 0, vertical = false))
}
}
// scalastyle:on println

/**
Expand Down Expand Up @@ -2453,6 +2470,11 @@ class Dataset[T] private[sql](
*/
def take(n: Int): Array[T] = head(n)

// An internal version of `take`, which won't set execution id and trigger listeners.
private[sql] def takeInternal(n: Int): Array[T] = {
collectFromPlan(limit(n).queryExecution.executedPlan)
}

/**
* Returns the first `n` rows in the Dataset as a list.
*
Expand All @@ -2477,6 +2499,11 @@ class Dataset[T] private[sql](
*/
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

// An internal version of `collect`, which won't set execution id and trigger listeners.
private[sql] def collectInternal(): Array[T] = {
collectFromPlan(queryExecution.executedPlan)
}

/**
* Returns a Java list that contains all rows in this Dataset.
*
Expand Down Expand Up @@ -2518,6 +2545,11 @@ class Dataset[T] private[sql](
plan.executeCollect().head.getLong(0)
}

// An internal version of `count`, which won't set execution id and trigger listeners.
private[sql] def countInternal(): Long = {
groupBy().count().queryExecution.executedPlan.executeCollect().head.getLong(0)
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
*
Expand Down Expand Up @@ -2763,7 +2795,7 @@ class Dataset[T] private[sql](
createTempViewCommand(viewName, replace = true, global = true)
}

private def createTempViewCommand(
private[spark] def createTempViewCommand(
viewName: String,
replace: Boolean,
global: Boolean): CreateViewCommand = {
Expand Down Expand Up @@ -2954,17 +2986,17 @@ class Dataset[T] private[sql](
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {


/**
* Returns the result as a hive compatible sequence of strings. This is for testing only.
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(): Seq[String] = executedPlan match {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sparkSession).map {
Expand All @@ -127,7 +128,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ object SQLExecution {
executionIdToQueryExecution.get(executionId)
}

private val testing = sys.props.contains("spark.testing")

private[sql] def checkSQLExecutionId(sparkSession: SparkSession): Unit = {
// only throw an exception during tests. a missing execution ID should not fail a job.
if (testing && sparkSession.sparkContext.getLocalProperty(EXECUTION_ID_KEY) == null) {
// Attention testers: when a test fails with this exception, it means that the action that
// started execution of a query didn't call withNewExecutionId. The execution ID should be
// set by calling withNewExecutionId in the action that begins execution, like
// Dataset.collect or DataFrameWriter.insertInto.
throw new IllegalStateException("Execution ID should be set")
}
}

/**
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
* we can connect them with an execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class InMemoryRelation(
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
override def innerChildren: Seq[SparkPlan] = Seq(child)

override def producedAttributes: AttributeSet = outputSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class InMemoryTableScanExec(
@transient relation: InMemoryRelation)
extends LeafExecNode {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution


/**
Expand Down Expand Up @@ -96,11 +97,13 @@ case class AnalyzeColumnCommand(
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))

val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation))
.executedPlan.executeTake(1).head

val rowCount = statsRow.getLong(0)
val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1), attr))
// according to `ColumnStat.statExprs`, the stats struct always have 6 fields.
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), attr))
}.toMap
(rowCount, columnStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class AnalyzeTableCommand(
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = sparkSession.table(tableIdentWithDB).count()
val newRowCount = sparkSession.table(tableIdentWithDB).countInternal()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ case class CacheTableCommand(
require(plan.isEmpty || tableIdent.database.isEmpty,
"Database name is not allowed in CACHE TABLE AS SELECT")

override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq
}
override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq

override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
Dataset.ofRows(sparkSession, logicalPlan)
.createTempViewCommand(tableIdent.quotedString, replace = false, global = false)
.run(sparkSession)
}
sparkSession.catalog.cacheTable(tableIdent.quotedString)

if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
sparkSession.table(tableIdent).countInternal()
}

Seq.empty[Row]
Expand Down
Loading

0 comments on commit 10e526e

Please sign in to comment.