diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 005784cccac13..0a3bd09d2507b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction} import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} @@ -3658,12 +3658,6 @@ class Analyzer(override val catalogManager: CatalogManager) comment.fieldNames(), TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment)) - case rename: RenameColumn => - resolveFieldNames( - schema, - rename.fieldNames(), - TableChange.renameColumn(_, rename.newName())).orElse(Some(rename)) - case delete: DeleteColumn => resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn) .orElse(Some(delete)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7679a87ee087c..95854966a1e8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils} import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -451,6 +451,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " + s"schema: ${table.schema.treeString}") } + checkAlterTableCommand(alter) case alter: AlterTable if alter.table.resolved => val table = alter.table @@ -579,10 +580,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { updatePos.position(), parent, colsToAdd.getOrElse(parentName, Nil)) - case rename: RenameColumn => - findField("rename", rename.fieldNames) - checkColumnNotExists( - "rename", rename.fieldNames().init :+ rename.newName(), table.schema) case update: UpdateColumnComment => findField("update", update.fieldNames) case delete: DeleteColumn => @@ -1078,4 +1075,22 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { failOnOuterReferenceInSubTree(p) }} } + + /** + * Validates the options used for alter table commands after table and columns are resolved. + */ + private def checkAlterTableCommand(alter: AlterTableCommand): Unit = { + def checkColumnNotExists(fieldNames: Seq[String], struct: StructType): Unit = { + if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) { + alter.failAnalysis(s"Cannot ${alter.operation} column, because ${fieldNames.quoted} " + + s"already exists in ${struct.treeString}") + } + } + + alter match { + case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) => + checkColumnNotExists(name.init :+ newName, table.schema) + case _ => + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 59a1c13b4067c..66a2b96ce7f5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -88,11 +88,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) tbl, typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) - case AlterTableRenameColumnStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) => - val changes = Seq(TableChange.renameColumn(col.toArray, newName)) - createAlterTable(nameParts, catalog, tbl, changes) - case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => assertNoNullTypeInSchema(c.tableSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f82b9be9a686d..dd48381ab25d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3542,7 +3542,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a [[AlterTableRenameColumnStatement]] command. + * Parse a [[AlterTableRenameColumn]] command. * * For example: * {{{ @@ -3551,9 +3551,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitRenameTableColumn( ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) { - AlterTableRenameColumnStatement( - visitMultipartIdentifier(ctx.table), - ctx.from.parts.asScala.map(_.getText).toSeq, + AlterTableRenameColumn( + createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"), + UnresolvedFieldName(ctx.from.parts.asScala.map(_.getText).toSeq), ctx.to.getText) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index b87f65fe98ac1..c14138e04fdd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -257,14 +257,6 @@ case class AlterTableAlterColumnStatement( comment: Option[String], position: Option[ColumnPosition]) extends LeafParsedStatement -/** - * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL. - */ -case class AlterTableRenameColumnStatement( - tableName: Seq[String], - column: Seq[String], - newName: String) extends LeafParsedStatement - /** * An INSERT INTO statement, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index e1317771dda98..2384d28d862bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1124,3 +1124,21 @@ case class AlterTableDropColumns( override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild) } + +/** + * The logical plan of the ALTER TABLE ... RENAME COLUMN command. + */ +case class AlterTableRenameColumn( + table: LogicalPlan, + column: FieldName, + newName: String) extends AlterTableCommand { + override def operation: String = "rename" + + override def changes: Seq[TableChange] = { + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + Seq(TableChange.renameColumn(column.name.toArray, newName)) + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index b0e0d582c4908..5518409f614b9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -892,9 +892,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: rename column") { comparePlans( parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"), - AlterTableRenameColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableRenameColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), "d")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 427c5708d9d41..1775d5837b5a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -147,15 +147,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) } - case AlterTableRenameColumnStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError - }.getOrElse { - val changes = Seq(TableChange.renameColumn(col.toArray, newName)) - createAlterTable(nameParts, catalog, tbl, changes) - } + case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) => + throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) => throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index dd95ceb59bdc4..24d33e2045820 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, TestRelation2} -import org.apache.spark.sql.catalyst.analysis.CreateTablePartitioningValidationSuite -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, UnresolvedFieldName} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -33,6 +32,12 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes import CreateTablePartitioningValidationSuite._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + private val table = ResolvedTable( + catalog, + Identifier.of(Array(), "table_name"), + null, + schema.toAttributes) + override protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = { Seq(PreprocessTableCreation(spark)) } @@ -186,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: drop column resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - TableChange.deleteColumn(ref), + AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))), Seq("Cannot delete missing field", ref.quoted) ) } @@ -195,7 +200,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: rename column resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - TableChange.renameColumn(ref, "newName"), + AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"), Seq("Cannot rename missing field", ref.quoted) ) } @@ -250,4 +255,16 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes } } } + + private def alterTableTest(alter: AlterTableCommand, error: Seq[String]): Unit = { + Seq(true, false).foreach { caseSensitive => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + if (caseSensitive) { + assertAnalysisError(alter, error, caseSensitive) + } else { + assertAnalysisSuccess(alter, caseSensitive) + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index dd170efab97a6..34ff874b1eaef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -346,7 +346,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") }.getMessage - assert(msg.contains("Cannot rename missing field C2 in test.alt_table schema")) + assert(msg.contains("Cannot rename missing field C2 in h2.test.alt_table schema")) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {