From abe370f97132a835e8a7cf15d4ecbffb0d49eafd Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 13 Jun 2019 13:48:40 +0800 Subject: [PATCH] [SPARK-27322][SQL] DataSourceV2 table relation ## What changes were proposed in this pull request? Support multi-catalog in the following SELECT code paths: - SELECT * FROM catalog.db.tbl - TABLE catalog.db.tbl - JOIN or UNION tables from different catalogs - SparkSession.table("catalog.db.tbl") - CTE relation - View text ## How was this patch tested? New unit tests. All existing unit tests in catalyst and sql core. Closes #24741 from jzhuge/SPARK-27322-pr. Authored-by: John Zhuge Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalog/v2/utils/CatalogV2Util.scala | 13 +++- .../sql/util/CaseInsensitiveStringMap.java | 18 ++++++ .../sql/catalyst/analysis/Analyzer.scala | 48 +++++++++----- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/analysis/ResolveHints.scala | 12 ++-- .../sql/catalyst/analysis/unresolved.scala | 13 ++-- .../spark/sql/catalyst/dsl/package.scala | 5 +- .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../v2/DataSourceV2Implicits.scala | 0 .../datasources/v2/DataSourceV2Relation.scala | 2 + .../sql/catalyst/parser/PlanParserSuite.scala | 31 +++++++--- .../org/apache/spark/sql/SparkSession.scala | 6 +- .../spark/sql/execution/command/views.scala | 9 ++- .../sql/execution/datasources/rules.scala | 8 +-- .../internal/BaseSessionStateBuilder.scala | 3 + .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 62 +++++++++++++++++++ .../sql/util/DataFrameCallbackSuite.scala | 2 +- .../sql/hive/HiveSessionStateBuilder.scala | 3 + .../spark/sql/hive/HiveStrategies.scala | 5 +- .../apache/spark/sql/hive/InsertSuite.scala | 12 ++++ .../apache/spark/sql/hive/test/TestHive.scala | 2 +- 24 files changed, 213 insertions(+), 55 deletions(-) rename sql/{core => catalyst}/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala (100%) rename sql/{core => catalyst}/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala (97%) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 2aa80bbd33942..d58fb7cb20bfc 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -407,7 +407,7 @@ queryTerm queryPrimary : querySpecification #queryPrimaryDefault | fromStatement #fromStmt - | TABLE tableIdentifier #table + | TABLE multipartIdentifier #table | inlineTable #inlineTableDefault1 | '(' queryNoWith ')' #subquery ; @@ -579,7 +579,7 @@ identifierComment ; relationPrimary - : tableIdentifier sample? tableAlias #tableName + : multipartIdentifier sample? tableAlias #tableName | '(' queryNoWith ')' sample? tableAlias #aliasedQuery | '(' relation ')' sample? tableAlias #aliasedRelation | inlineTable #inlineTableDefault2 diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala index a00bcab602e8c..6de1ef5660e55 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala @@ -22,11 +22,15 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.TableChange +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange} import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.{StructField, StructType} object CatalogV2Util { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + /** * Apply properties changes to a map and return the result. */ @@ -149,4 +153,11 @@ object CatalogV2Util { new StructType(newFields) } + + def loadTable(catalog: CatalogPlugin, ident: Identifier): Option[Table] = + try { + Option(catalog.asTableCatalog.loadTable(ident)) + } catch { + case _: NoSuchTableException => None + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index da41346d7ce71..c344a62be40c1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -178,4 +179,21 @@ public double getDouble(String key, double defaultValue) { public Map asCaseSensitiveMap() { return Collections.unmodifiableMap(original); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o; + return delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 841b85843c888..749c4094205b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -162,6 +163,7 @@ class Analyzer( new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: + ResolveTables :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: @@ -226,7 +228,7 @@ class Analyzer( def substituteCTE(plan: LogicalPlan, cteName: String, ctePlan: LogicalPlan): LogicalPlan = { plan resolveOperatorsUp { - case UnresolvedRelation(TableIdentifier(table, None)) if resolver(cteName, table) => + case UnresolvedRelation(Seq(table)) if resolver(cteName, table) => ctePlan case u: UnresolvedRelation => u @@ -657,6 +659,20 @@ class Analyzer( } } + /** + * Resolve table relations with concrete relations from v2 catalog. + * + * [[ResolveRelations]] still resolves v1 tables. + */ + object ResolveTables extends Rule[LogicalPlan] { + import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => + loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) + } + } + /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ @@ -689,10 +705,15 @@ class Analyzer( // Note this is compatible with the views defined by older versions of Spark(before 2.2), which // have empty defaultDatabase and all the relations in viewText have database part defined. def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match { - case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) => + case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) => val defaultDatabase = AnalysisContext.get.defaultDatabase - val foundRelation = lookupTableFromCatalog(u, defaultDatabase) - resolveRelation(foundRelation) + val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase) + if (foundRelation != u) { + resolveRelation(foundRelation) + } else { + u + } + // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. @@ -715,8 +736,9 @@ class Analyzer( } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => - EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { + case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _) + if child.resolved => + EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) @@ -731,20 +753,16 @@ class Analyzer( // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. private def lookupTableFromCatalog( + tableIdentifier: TableIdentifier, u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { - val tableIdentWithDb = u.tableIdentifier.copy( - database = u.tableIdentifier.database.orElse(defaultDatabase)) + val tableIdentWithDb = tableIdentifier.copy( + database = tableIdentifier.database.orElse(defaultDatabase)) try { catalog.lookupRelation(tableIdentWithDb) } catch { - case e: NoSuchTableException => - u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e) - // If the database is defined and that database is not found, throw an AnalysisException. - // Note that if the database is not defined, it is possible we are looking up a temp view. - case e: NoSuchDatabaseException => - u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + - s"database ${e.db} doesn't exist.", e) + case _: NoSuchTableException | _: NoSuchDatabaseException => + u } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f9365cd022b16..6b9b4f4e1ba37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -90,7 +90,7 @@ trait CheckAnalysis extends PredicateHelper { case p if p.analyzed => // Skip already analyzed sub-plans case u: UnresolvedRelation => - u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}") + u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 9440a3f806b4e..539b41789b586 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -71,18 +71,20 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { - case ResolvedHint(u: UnresolvedRelation, hint) - if relations.exists(resolver(_, u.tableIdentifier.table)) => - relations.remove(u.tableIdentifier.table) + case ResolvedHint(u @ UnresolvedRelation(ident), hint) + if relations.exists(resolver(_, ident.last)) => + relations.remove(ident.last) ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) + case ResolvedHint(r: SubqueryAlias, hint) if relations.exists(resolver(_, r.alias)) => relations.remove(r.alias) ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo)) - case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) => - relations.remove(u.tableIdentifier.table) + case u @ UnresolvedRelation(ident) if relations.exists(resolver(_, ident.last)) => + relations.remove(ident.last) ResolvedHint(plan, createHintInfo(hintName)) + case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) => relations.remove(r.alias) ResolvedHint(plan, createHintInfo(hintName)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index d44b42134f868..b700c336e6ae8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -38,19 +38,24 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str /** * Holds the name of a relation that has yet to be looked up in a catalog. * - * @param tableIdentifier table name + * @param multipartIdentifier table name */ -case class UnresolvedRelation(tableIdentifier: TableIdentifier) - extends LeafNode { +case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ /** Returns a `.` separated name for this relation. */ - def tableName: String = tableIdentifier.unquotedString + def tableName: String = multipartIdentifier.quoted override def output: Seq[Attribute] = Nil override lazy val resolved = false } +object UnresolvedRelation { + def apply(tableIdentifier: TableIdentifier): UnresolvedRelation = + UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) +} + /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 846ee3b386527..54fc1f9abb086 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -295,10 +295,7 @@ package object dsl { object expressions extends ExpressionConversions // scalastyle:ignore object plans { // scalastyle:ignore - def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) - - def table(db: String, ref: String): LogicalPlan = - UnresolvedRelation(TableIdentifier(ref, Option(db))) + def table(parts: String*): LogicalPlan = UnresolvedRelation(parts) implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) { def select(exprs: Expression*): LogicalPlan = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b210199bf63f0..0048088f3ede0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -898,14 +898,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) { - UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier)) + UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier)) } /** * Create an aliased table reference. This is typically used in FROM clauses. */ override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { - val tableId = visitTableIdentifier(ctx.tableIdentifier) + val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId)) table.optionalMap(ctx.sample)(withSample) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala similarity index 100% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index fc919439d9224..9ae3dbbc45502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -104,6 +104,8 @@ object DataSourceV2Relation { DataSourceV2Relation(table, output, options) } + def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty) + /** * This is used to transform data source v2 statistics to logical.Statistics. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index a783491d39e5d..fba2a28c3fc38 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -42,6 +42,14 @@ class PlanParserSuite extends AnalysisTest { private def intercept(sqlCommand: String, messages: String*): Unit = interceptParseException(parsePlan)(sqlCommand, messages: _*) + private def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { + val ctes = namedPlans.map { + case (name, cte) => + name -> SubqueryAlias(name, cte) + } + With(plan, ctes) + } + test("case insensitive") { val plan = table("a").select(star()) assertEqual("sELEct * FroM a", plan) @@ -74,13 +82,6 @@ class PlanParserSuite extends AnalysisTest { } test("common table expressions") { - def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { - val ctes = namedPlans.map { - case (name, cte) => - name -> SubqueryAlias(name, cte) - } - With(plan, ctes) - } assertEqual( "with cte1 as (select * from a) select * from cte1", cte(table("cte1").select(star()), "cte1" -> table("a").select(star()))) @@ -801,4 +802,20 @@ class PlanParserSuite extends AnalysisTest { }.getMessage assert(m2.contains("mismatched input 'IN' expecting")) } + + test("relation in v2 catalog") { + assertEqual("TABLE testcat.db.tab", table("testcat", "db", "tab")) + assertEqual("SELECT * FROM testcat.db.tab", table("testcat", "db", "tab").select(star())) + + assertEqual( + """ + |WITH cte1 AS (SELECT * FROM testcat.db.tab) + |SELECT * FROM cte1 + """.stripMargin, + cte(table("cte1").select(star()), "cte1" -> table("testcat", "db", "tab").select(star()))) + + assertEqual( + "SELECT /*+ BROADCAST(tab) */ * FROM testcat.db.tab", + table("testcat", "db", "tab").select(star()).hint("BROADCAST", $"tab")) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0b5bf3f48b593..90d1b9205787b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -624,7 +624,11 @@ class SparkSession private( * @since 2.0.0 */ def table(tableName: String): DataFrame = { - table(sessionState.sqlParser.parseTableIdentifier(tableName)) + table(sessionState.sqlParser.parseMultipartIdentifier(tableName)) + } + + private[sql] def table(multipartIdentifier: Seq[String]): DataFrame = { + Dataset.ofRows(self, UnresolvedRelation(multipartIdentifier)) } private[sql] def table(tableIdent: TableIdentifier): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index cd34dfafd1320..4d3eb11250c3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -181,6 +181,8 @@ case class CreateViewCommand( * Permanent views are not allowed to reference temp objects, including temp function and views */ private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = { + import sparkSession.sessionState.analyzer.AsTableIdentifier + if (!isTemporary) { // This func traverses the unresolved plan `child`. Below are the reasons: // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding @@ -190,10 +192,11 @@ case class CreateViewCommand( // package (e.g., HiveGenericUDF). child.collect { // Disallow creating permanent views based on temporary views. - case s: UnresolvedRelation - if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => + case UnresolvedRelation(AsTableIdentifier(ident)) + if sparkSession.sessionState.catalog.isTemporaryTable(ident) => + // temporary views are only stored in the session catalog throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary view ${s.tableIdentifier}") + s"referencing a temporary view $ident") case other if !other.resolved => other.expressions.flatMap(_.collect { // Disallow creating permanent views based on temporary UDFs. case e: UnresolvedFunction diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 534e2fd0757f9..e8951bc8e7164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.util.SchemaUtils */ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { private def maybeSQLFile(u: UnresolvedRelation): Boolean = { - sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined + sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2 } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -45,8 +45,8 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { try { val dataSource = DataSource( sparkSession, - paths = u.tableIdentifier.table :: Nil, - className = u.tableIdentifier.database.get) + paths = u.multipartIdentifier.last :: Nil, + className = u.multipartIdentifier.head) // `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch // will catch it and return the original plan, so that the analyzer can report table not @@ -55,7 +55,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { if (!isFileFormat || dataSource.className.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Unsupported data source type for direct query on files: " + - s"${u.tableIdentifier.database.get}") + s"${dataSource.className}") } LogicalRelation(dataSource.resolveRelation()) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b2d065274b151..8d73449c3533d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.SparkConf import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} +import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -185,6 +186,8 @@ abstract class BaseSessionStateBuilder( V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules + + override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 9554da28c9a02..d15c1f47b3d23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2160,7 +2160,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(output.contains( """== Parsed Logical Plan == |'Project [*] - |+- 'UnresolvedRelation `tmp`""".stripMargin)) + |+- 'UnresolvedRelation [tmp]""".stripMargin)) assert(output.contains( """== Physical Plan == |*(1) Range (0, 10, step=1, splits=2)""".stripMargin)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index da2645ccca966..5d77309c2ed43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1695,7 +1695,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`") } assert(e.message.contains("Table or view not found: " + - "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")) + "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.file_path")) e = intercept[AnalysisException] { sql(s"select id from `Jdbc`.`file_path`") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 5b9071b59b9b0..96345e22dbd5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -36,6 +36,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.default.catalog", "testcat") val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") @@ -47,6 +48,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn after { spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.sql("DROP TABLE source") + spark.sql("DROP TABLE source2") } test("CreateTable: use v2 plan because catalog is set") { @@ -282,4 +284,64 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } sql(s"DROP TABLE IF EXISTS testcat.db.notbl") } + + test("Relation: basic") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer(sql(s"TABLE $t1"), spark.table("source")) + checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source")) + } + } + + test("Relation: SparkSession.table()") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer(spark.table(s"$t1"), spark.table("source")) + } + } + + test("Relation: CTE") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + checkAnswer( + sql(s""" + |WITH cte AS (SELECT * FROM $t1) + |SELECT * FROM cte + """.stripMargin), + spark.table("source")) + } + } + + test("Relation: view text") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + withView("view1") { v1: String => + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + sql(s"CREATE VIEW $v1 AS SELECT * from $t1") + checkAnswer(sql(s"TABLE $v1"), spark.table("source")) + } + } + } + + test("Relation: join tables in 2 catalogs") { + val t1 = "testcat.ns1.ns2.tbl" + val t2 = "testcat2.v2tbl" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source") + sql(s"CREATE TABLE $t2 USING foo AS SELECT id, data FROM source2") + val df1 = spark.table("source") + val df2 = spark.table("source2") + val df_joined = df1.join(df2).where(df1("id") + 1 === df2("id")) + checkAnswer( + sql(s""" + |SELECT * + |FROM $t1 t1, $t2 t2 + |WHERE t1.id + 1 = t2.id + """.stripMargin), + df_joined) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index ddc5dbb148cb5..c347caef39a64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -202,7 +202,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(commands(2)._1 == "insertInto") assert(commands(2)._2.isInstanceOf[InsertIntoTable]) assert(commands(2)._2.asInstanceOf[InsertIntoTable].table - .asInstanceOf[UnresolvedRelation].tableIdentifier.table == "tab") + .asInstanceOf[UnresolvedRelation].multipartIdentifier == Seq("tab")) } // exiting withTable adds commands(3) via onSuccess (drops tab) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 0e7df8e921978..b04b3f1031d73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.annotation.{Experimental, Unstable} import org.apache.spark.sql._ +import org.apache.spark.sql.catalog.v2.CatalogPlugin import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -91,6 +92,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session V2WriteSupportCheck +: V2StreamingScanSupportCheck +: customCheckRules + + override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 58b711006e476..7b28e4f401aba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -153,11 +153,12 @@ object HiveAnalysis extends Rule[LogicalPlan] { case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) - case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && query.resolved => CreateHiveTableAsSelectCommand(tableDesc, query, query.output.map(_.name), mode) case InsertIntoDir(isLocal, storage, provider, child, overwrite) - if DDLUtils.isHiveTable(provider) => + if DDLUtils.isHiveTable(provider) && child.resolved => val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 48d969c5ee9ac..70307ed7e830e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -758,6 +758,18 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } + + test("insert overwrite to dir from non-existent table") { + withTempDir { dir => + val path = dir.toURI.getPath + + val e = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' TABLE notexists") + }.getMessage + assert(e.contains("Table or view not found")) + } + } + test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") { withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { withTable("tab1", "tab2") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 5e77cac450ac3..8cdb8dd84fb2e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -595,7 +595,7 @@ private[hive] class TestHiveQueryExecution( // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent) => tableIdent.table } + logical.collect { case UnresolvedRelation(ident) => ident.last } val resolver = sparkSession.sessionState.conf.resolver val referencedTestTables = sparkSession.testTables.keys.filter { testTable => referencedTables.exists(resolver(_, testTable))