Skip to content

Commit

Permalink
[SPARK-17717][SQL] Add exist/find methods to Catalog.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions.

## How was this patch tested?
Added tests to `org.apache.spark.sql.internal.CatalogSuite`

Author: Herman van Hovell <[email protected]>

Closes apache#15301 from hvanhovell/SPARK-17717.
  • Loading branch information
hvanhovell authored and rxin committed Sep 30, 2016
1 parent 2f73956 commit 74ac1c4
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 25 deletions.
11 changes: 10 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@ object MimaExcludes {
// [SPARK-16967] Move Mesos to Module
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
// [SPARK-16240] ML persistence backward compatibility for LDA
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
// [SPARK-17717] Add Find and Exists method to Catalog.
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists")
)
}

Expand Down
83 changes: 83 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,89 @@ abstract class Catalog {
@throws[AnalysisException]("database or table does not exist")
def listColumns(dbName: String, tableName: String): Dataset[Column]

/**
* Find the database with the specified name. This throws an AnalysisException when the database
* cannot be found.
*
* @since 2.1.0
*/
@throws[AnalysisException]("database does not exist")
def findDatabase(dbName: String): Database

/**
* Find the table with the specified name. This table can be a temporary table or a table in the
* current database. This throws an AnalysisException when the table cannot be found.
*
* @since 2.1.0
*/
@throws[AnalysisException]("table does not exist")
def findTable(tableName: String): Table

/**
* Find the table with the specified name in the specified database. This throws an
* AnalysisException when the table cannot be found.
*
* @since 2.1.0
*/
@throws[AnalysisException]("database or table does not exist")
def findTable(dbName: String, tableName: String): Table

/**
* Find the function with the specified name. This function can be a temporary function or a
* function in the current database. This throws an AnalysisException when the function cannot
* be found.
*
* @since 2.1.0
*/
@throws[AnalysisException]("function does not exist")
def findFunction(functionName: String): Function

/**
* Find the function with the specified name. This throws an AnalysisException when the function
* cannot be found.
*
* @since 2.1.0
*/
@throws[AnalysisException]("database or function does not exist")
def findFunction(dbName: String, functionName: String): Function

/**
* Check if the database with the specified name exists.
*
* @since 2.1.0
*/
def databaseExists(dbName: String): Boolean

/**
* Check if the table with the specified name exists. This can either be a temporary table or a
* table in the current database.
*
* @since 2.1.0
*/
def tableExists(tableName: String): Boolean

/**
* Check if the table with the specified name exists in the specified database.
*
* @since 2.1.0
*/
def tableExists(dbName: String, tableName: String): Boolean

/**
* Check if the function with the specified name exists. This can either be a temporary function
* or a function in the current database.
*
* @since 2.1.0
*/
def functionExists(functionName: String): Boolean

/**
* Check if the function with the specified name exists in the specified database.
*
* @since 2.1.0
*/
def functionExists(dbName: String, functionName: String): Boolean

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
Expand Down
152 changes: 128 additions & 24 deletions sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -69,15 +69,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def listDatabases(): Dataset[Database] = {
val databases = sessionCatalog.listDatabases().map { dbName =>
val metadata = sessionCatalog.getDatabaseMetadata(dbName)
new Database(
name = metadata.name,
description = metadata.description,
locationUri = metadata.locationUri)
makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
}
CatalogImpl.makeDataset(databases, sparkSession)
}

private def makeDatabase(metadata: CatalogDatabase): Database = {
new Database(
name = metadata.name,
description = metadata.description,
locationUri = metadata.locationUri)
}

/**
* Returns a list of tables in the current database.
* This includes all temporary tables.
Expand All @@ -94,18 +97,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def listTables(dbName: String): Dataset[Table] = {
requireDatabaseExists(dbName)
val tables = sessionCatalog.listTables(dbName).map { tableIdent =>
val isTemp = tableIdent.database.isEmpty
val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
new Table(
name = tableIdent.identifier,
database = metadata.flatMap(_.identifier.database).orNull,
description = metadata.flatMap(_.comment).orNull,
tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
isTemporary = isTemp)
makeTable(tableIdent, tableIdent.database.isEmpty)
}
CatalogImpl.makeDataset(tables, sparkSession)
}

private def makeTable(tableIdent: TableIdentifier, isTemp: Boolean): Table = {
val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
new Table(
name = tableIdent.identifier,
database = metadata.flatMap(_.identifier.database).orNull,
description = metadata.flatMap(_.comment).orNull,
tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
isTemporary = isTemp)
}

/**
* Returns a list of functions registered in the current database.
* This includes all temporary functions
Expand All @@ -121,18 +127,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
@throws[AnalysisException]("database does not exist")
override def listFunctions(dbName: String): Dataset[Function] = {
requireDatabaseExists(dbName)
val functions = sessionCatalog.listFunctions(dbName).map { case (funcIdent, _) =>
val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
new Function(
name = funcIdent.identifier,
database = funcIdent.database.orNull,
description = null, // for now, this is always undefined
className = metadata.getClassName,
isTemporary = funcIdent.database.isEmpty)
val functions = sessionCatalog.listFunctions(dbName).map { case (functIdent, _) =>
makeFunction(functIdent)
}
CatalogImpl.makeDataset(functions, sparkSession)
}

private def makeFunction(funcIdent: FunctionIdentifier): Function = {
val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
new Function(
name = funcIdent.identifier,
database = funcIdent.database.orNull,
description = null, // for now, this is always undefined
className = metadata.getClassName,
isTemporary = funcIdent.database.isEmpty)
}

/**
* Returns a list of columns for the given table in the current database.
*/
Expand Down Expand Up @@ -167,6 +177,100 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
CatalogImpl.makeDataset(columns, sparkSession)
}

/**
* Find the database with the specified name. This throws an [[AnalysisException]] when no
* [[Database]] can be found.
*/
override def findDatabase(dbName: String): Database = {
if (sessionCatalog.databaseExists(dbName)) {
makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
} else {
throw new AnalysisException(s"The specified database $dbName does not exist.")
}
}

/**
* Find the table with the specified name. This table can be a temporary table or a table in the
* current database. This throws an [[AnalysisException]] when no [[Table]] can be found.
*/
override def findTable(tableName: String): Table = {
findTable(null, tableName)
}

/**
* Find the table with the specified name in the specified database. This throws an
* [[AnalysisException]] when no [[Table]] can be found.
*/
override def findTable(dbName: String, tableName: String): Table = {
val tableIdent = TableIdentifier(tableName, Option(dbName))
val isTemporary = sessionCatalog.isTemporaryTable(tableIdent)
if (isTemporary || sessionCatalog.tableExists(tableIdent)) {
makeTable(tableIdent, isTemporary)
} else {
throw new AnalysisException(s"The specified table $tableIdent does not exist.")
}
}

/**
* Find the function with the specified name. This function can be a temporary function or a
* function in the current database. This throws an [[AnalysisException]] when no [[Function]]
* can be found.
*/
override def findFunction(functionName: String): Function = {
findFunction(null, functionName)
}

/**
* Find the function with the specified name. This returns [[None]] when no [[Function]] can be
* found.
*/
override def findFunction(dbName: String, functionName: String): Function = {
val functionIdent = FunctionIdentifier(functionName, Option(dbName))
if (sessionCatalog.functionExists(functionIdent)) {
makeFunction(functionIdent)
} else {
throw new AnalysisException(s"The specified function $functionIdent does not exist.")
}
}

/**
* Check if the database with the specified name exists.
*/
override def databaseExists(dbName: String): Boolean = {
sessionCatalog.databaseExists(dbName)
}

/**
* Check if the table with the specified name exists. This can either be a temporary table or a
* table in the current database.
*/
override def tableExists(tableName: String): Boolean = {
tableExists(null, tableName)
}

/**
* Check if the table with the specified name exists in the specified database.
*/
override def tableExists(dbName: String, tableName: String): Boolean = {
val tableIdent = TableIdentifier(tableName, Option(dbName))
sessionCatalog.isTemporaryTable(tableIdent) || sessionCatalog.tableExists(tableIdent)
}

/**
* Check if the function with the specified name exists. This can either be a temporary function
* or a function in the current database.
*/
override def functionExists(functionName: String): Boolean = {
functionExists(null, functionName)
}

/**
* Check if the function with the specified name exists in the specified database.
*/
override def functionExists(dbName: String, functionName: String): Boolean = {
sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName)))
}

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
Expand Down
Loading

0 comments on commit 74ac1c4

Please sign in to comment.