Skip to content

Commit

Permalink
[SPARK-37496][SQL] Migrate ReplaceTableAsSelectStatement to v2 command
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR migrates `ReplaceTableAsSelectStatement` to the v2 command

### Why are the changes needed?
Migrate to the standard V2 framework

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes apache#34754 from huaxingao/replace_table.

Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
  • Loading branch information
huaxingao committed Dec 1, 2021
1 parent 40b239c commit f97de30
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
orCreate = c.orCreate)

case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
}

object NonSessionCatalogAndTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3487,7 +3487,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Replace a table, returning a [[ReplaceTableStatement]] logical plan.
* Replace a table, returning a [[ReplaceTableStatement]] or [[ReplaceTableAsSelect]]
* logical plan.
*
* Expected format:
* {{{
Expand Down Expand Up @@ -3553,9 +3554,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx)

case Some(query) =>
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
orCreate = orCreate)
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, false)
ReplaceTableAsSelect(
UnresolvedDBObjectName(table, isNamespace = false),
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)

case _ =>
// Note: table schema includes both the table columns list and the partition columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,29 +165,6 @@ case class ReplaceTableStatement(
serde: Option[SerdeInfo],
orCreate: Boolean) extends LeafParsedStatement

/**
* A REPLACE TABLE AS SELECT command, as parsed from SQL.
*/
case class ReplaceTableAsSelectStatement(
tableName: Seq[String],
asSelect: LogicalPlan,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
writeOptions: Map[String, String],
serde: Option[SerdeInfo],
orCreate: Boolean) extends UnaryParsedStatement {

override def child: LogicalPlan = asSelect
override protected def withNewChildInternal(
newChild: LogicalPlan): ReplaceTableAsSelectStatement = copy(asSelect = newChild)
}


/**
* Column data as parsed by ALTER TABLE ... (ADD|REPLACE) COLUMNS.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
Expand Down Expand Up @@ -274,16 +273,17 @@ case class ReplaceTable(
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*/
case class ReplaceTableAsSelect(
catalog: TableCatalog,
tableName: Identifier,
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
properties: Map[String, String],
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

override def tableSchema: StructType = query.schema
override def child: LogicalPlan = query
override def left: LogicalPlan = name
override def right: LogicalPlan = query

override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
Expand All @@ -292,12 +292,19 @@ case class ReplaceTableAsSelect(
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
}

override def tableName: Identifier = {
assert(name.resolved)
name.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
}

override protected def withNewChildrenInternal(
newLeft: LogicalPlan,
newRight: LogicalPlan): LogicalPlan =
copy(name = newLeft, query = newRight)

override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}

override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceTableAsSelect =
copy(query = newChild)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo, TableSpec}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
Expand Down Expand Up @@ -314,11 +314,13 @@ private[sql] object CatalogV2Util {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

def convertTableProperties(r: ReplaceTableAsSelectStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)
withDefaultOwnership(props)
}

def convertTableProperties(
private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
serdeInfo: Option[SerdeInfo],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ class DDLParserSuite extends AnalysisTest {
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
assert(ctas.ifNotExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
case other =>
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
" should create a CreateTableStatement, and REPLACE TABLE should create a" +
Expand Down Expand Up @@ -2323,18 +2323,18 @@ class DDLParserSuite extends AnalysisTest {
ctas.comment,
ctas.serde,
ctas.external)
case rtas: ReplaceTableAsSelectStatement =>
case rtas: ReplaceTableAsSelect =>
TableSpec(
rtas.tableName,
Some(rtas.asSelect).filter(_.resolved).map(_.schema),
rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Some(rtas.query).filter(_.resolved).map(_.schema),
rtas.partitioning,
rtas.bucketSpec,
rtas.properties,
rtas.provider,
rtas.options,
rtas.location,
rtas.comment,
rtas.serde)
rtas.tableSpec.bucketSpec,
rtas.tableSpec.properties,
rtas.tableSpec.provider,
rtas.tableSpec.options,
rtas.tableSpec.location,
rtas.tableSpec.comment,
rtas.tableSpec.serde)
case other =>
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
s" from query, got ${other.getClass.getName}.")
Expand Down
31 changes: 17 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
Expand Down Expand Up @@ -586,19 +586,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)

case (SaveMode.Overwrite, _) =>
ReplaceTableAsSelectStatement(
nameParts,
df.queryExecution.analyzed,
val tableSpec = TableSpec(
bucketSpec = None,
properties = Map.empty,
provider = Some(source),
options = Map.empty,
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
serde = None,
external = false)
ReplaceTableAsSelect(
UnresolvedDBObjectName(nameParts, isNamespace = false),
partitioningAsV2,
None,
Map.empty,
Some(source),
Map.empty,
extraOptions.get("path"),
extraOptions.get(TableCatalog.PROP_COMMENT),
extraOptions.toMap,
None,
orCreate = true) // Create the table if it doesn't exist
df.queryExecution.analyzed,
tableSpec,
writeOptions = Map.empty,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
// We have a potential race condition here in AppendMode, if the table suddenly gets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.IntegerType
Expand Down Expand Up @@ -195,20 +195,22 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

private def internalReplace(orCreate: Boolean): Unit = {
runCommand(
ReplaceTableAsSelectStatement(
tableName,
logicalPlan,
partitioning.getOrElse(Seq.empty),
None,
properties.toMap,
provider,
Map.empty,
None,
None,
options.toMap,
None,
orCreate = orCreate))
val tableSpec = TableSpec(
bucketSpec = None,
properties = properties.toMap,
provider = provider,
options = Map.empty,
location = None,
comment = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
UnresolvedDBObjectName(tableName, isNamespace = false),
partitioning.getOrElse(Seq.empty),
logicalPlan,
tableSpec,
writeOptions = options.toMap,
orCreate = orCreate))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -144,7 +143,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
c.tableSpec.options,
Expand All @@ -157,7 +156,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c.tableSpec.location, c.tableSpec.comment, storageFormat,
c.tableSpec.external)
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
CreateTableV1(tableDesc, mode, None)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
Expand All @@ -173,7 +172,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
c.comment, storageFormat, c.external)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
CreateTableV1(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
Expand Down Expand Up @@ -210,21 +209,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
orCreate = c.orCreate)
}

case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
if isSessionCatalog(catalog) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
} else {
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ case class CreateTableExec(
ignoreIfExists: Boolean) extends LeafV2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tableProperties = {
val props = CatalogV2Util.convertTableProperties(
tableSpec.properties, tableSpec.options, tableSpec.serde,
tableSpec.location, tableSpec.comment, tableSpec.provider,
tableSpec.external)
CatalogV2Util.withDefaultOwnership(props)
}
val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
if (!catalog.tableExists(identifier)) {
Expand Down
Loading

0 comments on commit f97de30

Please sign in to comment.