Skip to content

Commit

Permalink
[SPARK-27322][SQL] DataSourceV2 table relation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Support multi-catalog in the following SELECT code paths:

- SELECT * FROM catalog.db.tbl
- TABLE catalog.db.tbl
- JOIN or UNION tables from different catalogs
- SparkSession.table("catalog.db.tbl")
- CTE relation
- View text

## How was this patch tested?

New unit tests.
All existing unit tests in catalyst and sql core.

Closes apache#24741 from jzhuge/SPARK-27322-pr.

Authored-by: John Zhuge <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
jzhuge authored and cloud-fan committed Jun 13, 2019
1 parent ddf4a50 commit abe370f
Show file tree
Hide file tree
Showing 24 changed files with 213 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ queryTerm
queryPrimary
: querySpecification #queryPrimaryDefault
| fromStatement #fromStmt
| TABLE tableIdentifier #table
| TABLE multipartIdentifier #table
| inlineTable #inlineTableDefault1
| '(' queryNoWith ')' #subquery
;
Expand Down Expand Up @@ -579,7 +579,7 @@ identifierComment
;

relationPrimary
: tableIdentifier sample? tableAlias #tableName
: multipartIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalog.v2.TableChange
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.{StructField, StructType}

object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/**
* Apply properties changes to a map and return the result.
*/
Expand Down Expand Up @@ -149,4 +153,11 @@ object CatalogV2Util {

new StructType(newFields)
}

def loadTable(catalog: CatalogPlugin, ident: Identifier): Option[Table] =
try {
Option(catalog.asTableCatalog.loadTable(ident))
} catch {
case _: NoSuchTableException => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -178,4 +179,21 @@ public double getDouble(String key, double defaultValue) {
public Map<String, String> asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return Objects.hash(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -162,6 +163,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveTables ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
Expand Down Expand Up @@ -226,7 +228,7 @@ class Analyzer(

def substituteCTE(plan: LogicalPlan, cteName: String, ctePlan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUp {
case UnresolvedRelation(TableIdentifier(table, None)) if resolver(cteName, table) =>
case UnresolvedRelation(Seq(table)) if resolver(cteName, table) =>
ctePlan
case u: UnresolvedRelation =>
u
Expand Down Expand Up @@ -657,6 +659,20 @@ class Analyzer(
}
}

/**
* Resolve table relations with concrete relations from v2 catalog.
*
* [[ResolveRelations]] still resolves v1 tables.
*/
object ResolveTables extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
}
}

/**
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
Expand Down Expand Up @@ -689,10 +705,15 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
resolveRelation(foundRelation)
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
if (foundRelation != u) {
resolveRelation(foundRelation)
} else {
u
}

// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
Expand All @@ -715,8 +736,9 @@ class Analyzer(
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
Expand All @@ -731,20 +753,16 @@ class Analyzer(
// and the default database is only used to look up a view);
// 3. Use the currentDb of the SessionCatalog.
private def lookupTableFromCatalog(
tableIdentifier: TableIdentifier,
u: UnresolvedRelation,
defaultDatabase: Option[String] = None): LogicalPlan = {
val tableIdentWithDb = u.tableIdentifier.copy(
database = u.tableIdentifier.database.orElse(defaultDatabase))
val tableIdentWithDb = tableIdentifier.copy(
database = tableIdentifier.database.orElse(defaultDatabase))
try {
catalog.lookupRelation(tableIdentWithDb)
} catch {
case e: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e)
// If the database is defined and that database is not found, throw an AnalysisException.
// Note that if the database is not defined, it is possible we are looking up a temp view.
case e: NoSuchDatabaseException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " +
s"database ${e.db} doesn't exist.", e)
case _: NoSuchTableException | _: NoSuchDatabaseException =>
u
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ trait CheckAnalysis extends PredicateHelper {
case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,20 @@ object ResolveHints {

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case ResolvedHint(u: UnresolvedRelation, hint)
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case ResolvedHint(u @ UnresolvedRelation(ident), hint)
if relations.exists(resolver(_, ident.last)) =>
relations.remove(ident.last)
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case ResolvedHint(r: SubqueryAlias, hint)
if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case u @ UnresolvedRelation(ident) if relations.exists(resolver(_, ident.last)) =>
relations.remove(ident.last)
ResolvedHint(plan, createHintInfo(hintName))

case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(plan, createHintInfo(hintName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*
* @param tableIdentifier table name
* @param multipartIdentifier table name
*/
case class UnresolvedRelation(tableIdentifier: TableIdentifier)
extends LeafNode {
case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString
def tableName: String = multipartIdentifier.quoted

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
}

object UnresolvedRelation {
def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
}

/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ package object dsl {
object expressions extends ExpressionConversions // scalastyle:ignore

object plans { // scalastyle:ignore
def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))

def table(db: String, ref: String): LogicalPlan =
UnresolvedRelation(TableIdentifier(ref, Option(db)))
def table(parts: String*): LogicalPlan = UnresolvedRelation(parts)

implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) {
def select(exprs: Expression*): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier))
}

/**
* Create an aliased table reference. This is typically used in FROM clauses.
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val tableId = visitTableIdentifier(ctx.tableIdentifier)
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
table.optionalMap(ctx.sample)(withSample)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ object DataSourceV2Relation {
DataSourceV2Relation(table, output, options)
}

def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty)

/**
* This is used to transform data source v2 statistics to logical.Statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class PlanParserSuite extends AnalysisTest {
private def intercept(sqlCommand: String, messages: String*): Unit =
interceptParseException(parsePlan)(sqlCommand, messages: _*)

private def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
}
With(plan, ctes)
}

test("case insensitive") {
val plan = table("a").select(star())
assertEqual("sELEct * FroM a", plan)
Expand Down Expand Up @@ -74,13 +82,6 @@ class PlanParserSuite extends AnalysisTest {
}

test("common table expressions") {
def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
}
With(plan, ctes)
}
assertEqual(
"with cte1 as (select * from a) select * from cte1",
cte(table("cte1").select(star()), "cte1" -> table("a").select(star())))
Expand Down Expand Up @@ -801,4 +802,20 @@ class PlanParserSuite extends AnalysisTest {
}.getMessage
assert(m2.contains("mismatched input 'IN' expecting"))
}

test("relation in v2 catalog") {
assertEqual("TABLE testcat.db.tab", table("testcat", "db", "tab"))
assertEqual("SELECT * FROM testcat.db.tab", table("testcat", "db", "tab").select(star()))

assertEqual(
"""
|WITH cte1 AS (SELECT * FROM testcat.db.tab)
|SELECT * FROM cte1
""".stripMargin,
cte(table("cte1").select(star()), "cte1" -> table("testcat", "db", "tab").select(star())))

assertEqual(
"SELECT /*+ BROADCAST(tab) */ * FROM testcat.db.tab",
table("testcat", "db", "tab").select(star()).hint("BROADCAST", $"tab"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,11 @@ class SparkSession private(
* @since 2.0.0
*/
def table(tableName: String): DataFrame = {
table(sessionState.sqlParser.parseTableIdentifier(tableName))
table(sessionState.sqlParser.parseMultipartIdentifier(tableName))
}

private[sql] def table(multipartIdentifier: Seq[String]): DataFrame = {
Dataset.ofRows(self, UnresolvedRelation(multipartIdentifier))
}

private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ case class CreateViewCommand(
* Permanent views are not allowed to reference temp objects, including temp function and views
*/
private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
import sparkSession.sessionState.analyzer.AsTableIdentifier

if (!isTemporary) {
// This func traverses the unresolved plan `child`. Below are the reasons:
// 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
Expand All @@ -190,10 +192,11 @@ case class CreateViewCommand(
// package (e.g., HiveGenericUDF).
child.collect {
// Disallow creating permanent views based on temporary views.
case s: UnresolvedRelation
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
case UnresolvedRelation(AsTableIdentifier(ident))
if sparkSession.sessionState.catalog.isTemporaryTable(ident) =>
// temporary views are only stored in the session catalog
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${s.tableIdentifier}")
s"referencing a temporary view $ident")
case other if !other.resolved => other.expressions.flatMap(_.collect {
// Disallow creating permanent views based on temporary UDFs.
case e: UnresolvedFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ import org.apache.spark.sql.util.SchemaUtils
*/
class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if maybeSQLFile(u) =>
try {
val dataSource = DataSource(
sparkSession,
paths = u.tableIdentifier.table :: Nil,
className = u.tableIdentifier.database.get)
paths = u.multipartIdentifier.last :: Nil,
className = u.multipartIdentifier.head)

// `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch
// will catch it and return the original plan, so that the analyzer can report table not
Expand All @@ -55,7 +55,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
if (!isFileFormat ||
dataSource.className.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
s"${dataSource.className}")
}
LogicalRelation(dataSource.resolveRelation())
} catch {
Expand Down
Loading

0 comments on commit abe370f

Please sign in to comment.