Skip to content

Commit

Permalink
[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs w…
Browse files Browse the repository at this point in the history
…hen calling .collect()

By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`.

Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class.

Author: Cheng Lian <[email protected]>

Closes apache#2215 from liancheng/lightweight-commands and squashes the following commits:

3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command
5a0e16c [Cheng Lian] Passes test suites
e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect
995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand
542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes
55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
  • Loading branch information
liancheng authored and marmbrus committed Sep 4, 2014
1 parent 4bba10c commit f48420f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

trait Command {
this: SparkPlan =>

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand All @@ -35,7 +37,11 @@ trait Command {
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
protected[sql] lazy val sideEffectResult: Seq[Row] = Seq.empty[Row]

override def executeCollect(): Array[Row] = sideEffectResult.toArray

override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

/**
Expand All @@ -47,17 +53,17 @@ case class SetCommand(
@transient context: SQLContext)
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
override protected[sql] lazy val sideEffectResult: Seq[Row] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
Array(s"$k=$v")
Array(Row(s"$k=$v"))
}

// Query the value bound to key k.
Expand All @@ -73,28 +79,22 @@ case class SetCommand(
"hive-0.12.0.jar").mkString(":")

Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getConf(k, "<undefined>")}")
Row("system:java.class.path=" + hiveJars),
Row("system:sun.java.command=shark.SharkServer2"))
} else {
Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
s"$k=$v"
Row(s"$k=$v")
}.toSeq

case _ =>
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}

Expand All @@ -113,19 +113,14 @@ case class ExplainCommand(
extends LeafNode with Command {

// Run through the optimizer to generate the physical plan.
override protected[sql] lazy val sideEffectResult: Seq[String] = try {
override protected[sql] lazy val sideEffectResult: Seq[Row] = try {
// TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties.
val queryExecution = context.executePlan(logicalPlan)
val outputString = if (extended) queryExecution.toString else queryExecution.simpleString

outputString.split("\n")
outputString.split("\n").map(Row(_))
} catch { case cause: TreeNodeException[_] =>
("Error occurred during query planning: \n" + cause.getMessage).split("\n")
}

def execute(): RDD[Row] = {
val explanation = sideEffectResult.map(row => new GenericRow(Array[Any](row)))
context.sparkContext.parallelize(explanation, 1)
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}

override def otherCopyArgs = context :: Nil
Expand All @@ -144,12 +139,7 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:
} else {
context.uncacheTable(tableName)
}
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
sideEffectResult
context.emptyResult
Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty
Expand All @@ -163,15 +153,8 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Seq(("# Registered as a temporary table", null, null)) ++
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}.mkString("{", ",", "}")
case (seq: Seq[_], ArrayType(typ, _)) =>
seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
case (map: Map[_,_], MapType(kType, vType, _)) =>
case (map: Map[_, _], MapType(kType, vType, _)) =>
map.map {
case (key, value) =>
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
Expand All @@ -409,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
command.sideEffectResult.map(_.toString)
command.sideEffectResult.map(_.head.toString)

case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.spark.sql.hive

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.catalyst.types.StringType
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.{SQLContext, SchemaRDD}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -196,9 +198,9 @@ private[hive] trait HiveStrategies {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.execution.{Command, LeafNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}

Expand All @@ -41,26 +41,21 @@ case class DescribeHiveTableCommand(
extends LeafNode with Command {

// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"

sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
lazy val hiveString: Seq[String] = sideEffectResult.map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse("None"))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil

val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
Expand All @@ -74,14 +69,9 @@ case class DescribeHiveTableCommand(
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}

results
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
results.map { case (name, dataType, comment) =>
Row(name, dataType, comment)
}
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@ case class NativeCommand(
@transient context: HiveContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)

override def execute(): RDD[Row] = {
if (sideEffectResult.size == 0) {
context.emptyResult
} else {
val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
context.sparkContext.parallelize(rows, 1)
}
}
override protected[sql] lazy val sideEffectResult: Seq[Row] = context.runSqlHive(sql).map(Row(_))

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,13 @@ import org.apache.spark.sql.hive.HiveContext
*/
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.analyze(tableName)
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Seq.empty[Row]
}
}

Expand All @@ -55,20 +49,14 @@ case class AnalyzeTable(tableName: String) extends LeafNode with Command {
*/
@DeveloperApi
case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Command {

def hiveContext = sqlContext.asInstanceOf[HiveContext]

def output = Seq.empty

override protected[sql] lazy val sideEffectResult: Seq[Any] = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
Seq.empty
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
Seq.empty[Row]
}
}

0 comments on commit f48420f

Please sign in to comment.