Skip to content

Commit

Permalink
[SPARK-17072][SQL] support table-level statistics generation and stor…
Browse files Browse the repository at this point in the history
…ing into/loading from metastore

## What changes were proposed in this pull request?

1. Support generation table-level statistics for
    - hive tables in HiveExternalCatalog
    - data source tables in HiveExternalCatalog
    - data source tables in InMemoryCatalog.
2. Add a property "catalogStats" in CatalogTable to hold statistics in Spark side.
3. Put logics of statistics transformation between Spark and Hive in HiveClientImpl.
4. Extend Statistics class by adding rowCount (will add estimatedSize when we have column stats).

## How was this patch tested?

add unit tests

Author: wangzhenhua <[email protected]>
Author: Zhenhua Wang <[email protected]>

Closes apache#14712 from wzhfy/tableStats.
  • Loading branch information
wzhfy authored and hvanhovell committed Sep 5, 2016
1 parent 3ccb23e commit 6d86403
Show file tree
Hide file tree
Showing 16 changed files with 363 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -130,6 +130,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
stats: Option[Statistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
Expand Down Expand Up @@ -190,6 +191,7 @@ case class CatalogTable(
viewText.map("View: " + _).getOrElse(""),
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get}" else "",
s"$storage")

output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ package org.apache.spark.sql.catalyst.plans.logical
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
* @param rowCount Estimated number of rows.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
case class Statistics(sizeInBytes: BigInt, isBroadcastable: Boolean = false)
case class Statistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
isBroadcastable: Boolean = false) {
override def toString: String = {
val output =
Seq(s"sizeInBytes=$sizeInBytes",
if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
s"isBroadcastable=$isBroadcastable"
)
output.filter(_.nonEmpty).mkString("Statistics(", ", ", ")")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,12 @@ class SQLBuilder private (

object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
Some(SQLTable(database, table, l.output.map(_.withQualifier(None))))
case l @ LogicalRelation(_, _, Some(catalogTable))
if catalogTable.identifier.database.isDefined =>
Some(SQLTable(
catalogTable.identifier.database.get,
catalogTable.identifier.table,
l.output.map(_.withQualifier(None))))

case relation: CatalogRelation =>
val m = relation.catalogTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.identifier.getText.toLowerCase == "noscan") {
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
} else {
// Always just run the no scan analyze. We should fix this and implement full analyze
// command in the future.
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString, noscan = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.datasources.LogicalRelation


/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
*
* Right now, it only supports Hive tables and it only updates the size of a Hive table
* in the Hive metastore.
*/
case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
Expand Down Expand Up @@ -71,8 +70,6 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
size
}

val tableParameters = catalogTable.properties
val oldTotalSize = tableParameters.get("totalSize").map(_.toLong).getOrElse(0L)
val newTotalSize =
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
Expand All @@ -88,24 +85,47 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
}
}.getOrElse(0L)

// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
sessionState.catalog.alterTable(
catalogTable.copy(
properties = relation.catalogTable.properties +
(AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
}
updateTableStats(catalogTable, newTotalSize)

// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)

case otherRelation =>
throw new AnalysisException(s"ANALYZE TABLE is only supported for Hive tables, " +
s"but '${tableIdent.unquotedString}' is a ${otherRelation.nodeName}.")
throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
s"${otherRelation.nodeName}.")
}

def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[Statistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(Statistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = Dataset.ofRows(sparkSession, relation).count()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
}
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
}
}

Seq.empty[Row]
}
}

object AnalyzeTableCommand {
val TOTAL_SIZE_FIELD = "totalSize"
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

LogicalRelation(
dataSource.resolveRelation(),
metastoreTableIdentifier = Some(table.identifier))
catalogTable = Some(table))
}

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down Expand Up @@ -366,7 +366,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
Expand All @@ -376,7 +377,8 @@ object DataSourceStrategy extends Strategy with Logging {
val scan = RowDataSourceScanExec(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata, relation.metastoreTableIdentifier)
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
table)
table.map(_.identifier))

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils
case class LogicalRelation(
relation: BaseRelation,
expectedOutputAttributes: Option[Seq[Attribute]] = None,
metastoreTableIdentifier: Option[TableIdentifier] = None)
catalogTable: Option[CatalogTable] = None)
extends LeafNode with MultiInstanceRelation {

override val output: Seq[AttributeReference] = {
Expand Down Expand Up @@ -72,9 +72,10 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = BigInt(relation.sizeInBytes)
)
@transient override lazy val statistics: Statistics = {
catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes))
}

/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
Expand All @@ -89,7 +90,7 @@ case class LogicalRelation(
LogicalRelation(
relation,
expectedOutputAttributes.map(_.map(_.newInstance())),
metastoreTableIdentifier).asInstanceOf[this.type]
catalogTable).asInstanceOf[this.type]
}

override def refresh(): Unit = relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, identifier) =>
val tblName = identifier.map(_.quotedString).getOrElse("unknown")
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, h.partitionSchema.map(_.name))
case LogicalRelation(_: InsertableRelation, _, identifier) =>
val tblName = identifier.map(_.quotedString).getOrElse("unknown")
case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
case other => i
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Right now, it only supports catalog tables and it only updates the size of a catalog table
* in the external catalog.
*/
def analyze(tableName: String): Unit = {
AnalyzeTableCommand(tableName).run(sparkSession)
def analyze(tableName: String, noscan: Boolean = true): Unit = {
AnalyzeTableCommand(tableName, noscan).run(sparkSession)
}
}
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -75,4 +76,29 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
}
}

test("test table-level statistics for data source table created in InMemoryCatalog") {
def checkTableStats(tableName: String, expectedRowCount: Option[BigInt]): Unit = {
val df = sql(s"SELECT * FROM $tableName")
val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
assert(rel.catalogTable.isDefined)
assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
rel
}
assert(relations.size === 1)
}

val tableName = "tbl"
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i INT, j STRING) USING parquet")
Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")

// noscan won't count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
checkTableStats(tableName, expectedRowCount = None)

// without noscan, we count the number of rows
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
checkTableStats(tableName, expectedRowCount = Some(2))
}
}
}
Loading

0 comments on commit 6d86403

Please sign in to comment.