Skip to content

Commit

Permalink
[SPARK-35883][SQL] Migrate ALTER TABLE RENAME COLUMN command to use U…
Browse files Browse the repository at this point in the history
…nresolvedTable to resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate the following `ALTER TABLE ... RENAME COLUMN` command to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900).

### Does this PR introduce _any_ user-facing change?

After this PR, the above `ALTER TABLE ... RENAME COLUMN` commands will have a consistent resolution behavior.

### How was this patch tested?

Updated existing tests.

Closes apache#33066 from imback82/alter_rename.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Jun 25, 2021
1 parent 6497ac3 commit f1ad345
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction}
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
Expand Down Expand Up @@ -3658,12 +3658,6 @@ class Analyzer(override val catalogManager: CatalogManager)
comment.fieldNames(),
TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment))

case rename: RenameColumn =>
resolveFieldNames(
schema,
rename.fieldNames(),
TableChange.renameColumn(_, rename.newName())).orElse(Some(rename))

case delete: DeleteColumn =>
resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn)
.orElse(Some(delete))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -451,6 +451,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " +
s"schema: ${table.schema.treeString}")
}
checkAlterTableCommand(alter)

case alter: AlterTable if alter.table.resolved =>
val table = alter.table
Expand Down Expand Up @@ -579,10 +580,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
updatePos.position(),
parent,
colsToAdd.getOrElse(parentName, Nil))
case rename: RenameColumn =>
findField("rename", rename.fieldNames)
checkColumnNotExists(
"rename", rename.fieldNames().init :+ rename.newName(), table.schema)
case update: UpdateColumnComment =>
findField("update", update.fieldNames)
case delete: DeleteColumn =>
Expand Down Expand Up @@ -1078,4 +1075,22 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
failOnOuterReferenceInSubTree(p)
}}
}

/**
* Validates the options used for alter table commands after table and columns are resolved.
*/
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
def checkColumnNotExists(fieldNames: Seq[String], struct: StructType): Unit = {
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
alter.failAnalysis(s"Cannot ${alter.operation} column, because ${fieldNames.quoted} " +
s"already exists in ${struct.treeString}")
}
}

alter match {
case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) =>
checkColumnNotExists(name.init :+ newName, table.schema)
case _ =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)

case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3542,7 +3542,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Parse a [[AlterTableRenameColumnStatement]] command.
* Parse a [[AlterTableRenameColumn]] command.
*
* For example:
* {{{
Expand All @@ -3551,9 +3551,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumnStatement(
visitMultipartIdentifier(ctx.table),
ctx.from.parts.asScala.map(_.getText).toSeq,
AlterTableRenameColumn(
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
UnresolvedFieldName(ctx.from.parts.asScala.map(_.getText).toSeq),
ctx.to.getText)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,6 @@ case class AlterTableAlterColumnStatement(
comment: Option[String],
position: Option[ColumnPosition]) extends LeafParsedStatement

/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
*/
case class AlterTableRenameColumnStatement(
tableName: Seq[String],
column: Seq[String],
newName: String) extends LeafParsedStatement

/**
* An INSERT INTO statement, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,3 +1124,21 @@ case class AlterTableDropColumns(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

/**
* The logical plan of the ALTER TABLE ... RENAME COLUMN command.
*/
case class AlterTableRenameColumn(
table: LogicalPlan,
column: FieldName,
newName: String) extends AlterTableCommand {
override def operation: String = "rename"

override def changes: Seq[TableChange] = {
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
Seq(TableChange.renameColumn(column.name.toArray, newName))
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -892,9 +892,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: rename column") {
comparePlans(
parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"),
AlterTableRenameColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableRenameColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
"d"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
}

case AlterTableRenameColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
}.getOrElse {
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError

case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.spark.sql.connector

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, TestRelation2}
import org.apache.spark.sql.catalyst.analysis.CreateTablePartitioningValidationSuite
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, UnresolvedFieldName}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
Expand All @@ -33,6 +32,12 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
import CreateTablePartitioningValidationSuite._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private val table = ResolvedTable(
catalog,
Identifier.of(Array(), "table_name"),
null,
schema.toAttributes)

override protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = {
Seq(PreprocessTableCreation(spark))
}
Expand Down Expand Up @@ -186,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: drop column resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
TableChange.deleteColumn(ref),
AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
Seq("Cannot delete missing field", ref.quoted)
)
}
Expand All @@ -195,7 +200,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: rename column resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
TableChange.renameColumn(ref, "newName"),
AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
Seq("Cannot rename missing field", ref.quoted)
)
}
Expand Down Expand Up @@ -250,4 +255,16 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
}
}
}

private def alterTableTest(alter: AlterTableCommand, error: Seq[String]): Unit = {
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
if (caseSensitive) {
assertAnalysisError(alter, error, caseSensitive)
} else {
assertAnalysisSuccess(alter, caseSensitive)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
}.getMessage
assert(msg.contains("Cannot rename missing field C2 in test.alt_table schema"))
assert(msg.contains("Cannot rename missing field C2 in h2.test.alt_table schema"))
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
Expand Down

0 comments on commit f1ad345

Please sign in to comment.