Skip to content

Commit

Permalink
[SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name w…
Browse files Browse the repository at this point in the history
…ith a schema-Name

### What changes were proposed in this pull request?

Fix `rename a table` in derby and pg, which schema name is not allowed to qualify the new table name

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

introduce a new error class

```json
  "CANNOT_RENAME_ACROSS_SCHEMA" : {
    "message" : [
      "Renaming a <type> across schemas is not allowed."
    ],
    "sqlState" : "0AKD0"
  },

```
### How was this patch tested?

new unit test

Closes apache#40602 from yaooqinn/SPARK-42978.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Mar 31, 2023
1 parent cb7d082 commit c8566f1
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 13 deletions.
1 change: 1 addition & 0 deletions core/src/main/resources/error/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ The following SQLSTATEs are collated from:
|09000 |09 |triggered action exception |000 |(no subclass) |SQL/Foundation |Y |SQL/Foundation PostgreSQL DB2 Redshift |
|0A000 |0A |feature not supported |000 |(no subclass) |SQL/Foundation |Y |SQL/Foundation PostgreSQL Redshift Redshift Oracle |
|0A001 |0A |feature not supported |001 |multiple server transactions |SQL/Foundation |Y |SQL/Foundation DB2 Oracle |
|0AKD0 |0A |Cross catalog or schema operation not supported |KD0 |Renaming a <type> across schemas is not allowed. |Spark |Y |Spark |
|0B000 |0B |Invalid Transaction Initiation |000 |invalid_transaction_initiation |PostgreSQL |N |PostgreSQL Redshift |
|0D000 |0D |invalid target type specification |000 |(no subclass) |SQL/Foundation |Y |SQL/Foundation |
|0E000 |0E |invalid schema name list specification |000 |(no subclass) |SQL/Foundation |Y |SQL/Foundation |
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
],
"sqlState" : "429BB"
},
"CANNOT_RENAME_ACROSS_SCHEMA" : {
"message" : [
"Renaming a <type> across schemas is not allowed."
],
"sqlState" : "0AKD0"
},
"CANNOT_RESTORE_PERMISSIONS_FOR_PATH" : {
"message" : [
"Failed to set permissions on created path <path> back to <permission>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkThrowableHelper, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
Expand Down Expand Up @@ -3493,4 +3493,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
errorClass = "NULLABLE_ROW_ID_ATTRIBUTES",
messageParameters = Map("nullableRowIdAttrs" -> nullableRowIdAttrs.mkString(", ")))
}

def cannotRenameTableAcrossSchemaError(): Throwable = {
new SparkUnsupportedOperationException(
errorClass = "CANNOT_RENAME_ACROSS_SCHEMA", messageParameters = Map("type" -> "table")
)
}
}
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@
<artifactId>ojdbc8</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
*/
def renameTable(
conn: Connection,
oldTable: String,
newTable: String,
oldTable: Identifier,
newTable: Identifier,
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, dialect.renameTable(oldTable, newTable))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class JDBCTableCatalog extends TableCatalog
checkNamespace(oldIdent.namespace())
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to $newIdent", dialect) {
JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -113,8 +114,9 @@ private object DB2Dialect extends JdbcDialect {
// scalastyle:off line.size.limit
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000980.html
// scalastyle:on line.size.limit
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
s"RENAME TABLE ${getFullyQualifiedQuotedTableName(oldTable)} TO " +
s"${getFullyQualifiedQuotedTableName(newTable)}"
}

// scalastyle:off line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.jdbc
import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -56,8 +57,13 @@ private object DerbyDialect extends JdbcDialect {
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
if (!oldTable.namespace().sameElements(newTable.namespace())) {
throw QueryCompilationErrors.cannotRenameTableAcrossSchemaError()
}
// New table name restriction:
// https://db.apache.org/derby/docs/10.2/ref/rrefnewtablename.html#rrefnewtablename
s"RENAME TABLE ${getFullyQualifiedQuotedTableName(oldTable)} TO ${newTable.name()}"
}

// Derby currently doesn't support comment on table. Here is the ticket to add the support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,24 @@ abstract class JdbcDialect extends Serializable with Logging {
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
@deprecated("Please override renameTable method with identifiers", "3.5.0")
def renameTable(oldTable: String, newTable: String): String = {
s"ALTER TABLE $oldTable RENAME TO $newTable"
}

/**
* Rename an existing table.
*
* @param oldTable The existing table.
* @param newTable New name of the table.
* @return The SQL statement to use for renaming the table.
*/
@Since("3.5.0")
def renameTable(oldTable: Identifier, newTable: Identifier): String = {
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO " +
s"${getFullyQualifiedQuotedTableName(newTable)}"
}

/**
* Alter an existing table.
*
Expand Down Expand Up @@ -598,6 +612,14 @@ abstract class JdbcDialect extends Serializable with Logging {

def getTableSample(sample: TableSampleInfo): String =
throw new UnsupportedOperationException("TableSample is not supported by this data source")

/**
* Return the DB-specific quoted and fully qualified table name
*/
@Since("3.5.0")
def getFullyQualifiedQuotedTableName(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.{Expression, NullOrdering, SortDirection}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
Expand Down Expand Up @@ -133,8 +134,9 @@ private object MsSqlServerDialect extends JdbcDialect {
// scalastyle:off line.size.limit
// See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-rename-transact-sql?view=sql-server-ver15
// scalastyle:on line.size.limit
override def renameTable(oldTable: String, newTable: String): String = {
s"EXEC sp_rename $oldTable, $newTable"
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
s"EXEC sp_rename ${getFullyQualifiedQuotedTableName(oldTable)}, " +
s"${getFullyQualifiedQuotedTableName(newTable)}"
}

// scalastyle:off line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -247,4 +248,11 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
"TABLESAMPLE BERNOULLI" +
s" (${(sample.upperBound - sample.lowerBound) * 100}) REPEATABLE (${sample.seed})"
}

override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
if (!oldTable.namespace().sameElements(newTable.namespace())) {
throw QueryCompilationErrors.cannotRenameTableAcrossSchemaError()
}
s"ALTER TABLE ${getFullyQualifiedQuotedTableName(oldTable)} RENAME TO ${newTable.name()}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc
import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -63,8 +64,9 @@ private case object TeradataDialect extends JdbcDialect {
}

// See https://docs.teradata.com/reader/scPHvjfglIlB8F70YliLAw/wysTNUMsP~0aGzksLCl1kg
override def renameTable(oldTable: String, newTable: String): String = {
s"RENAME TABLE $oldTable TO $newTable"
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
s"RENAME TABLE ${getFullyQualifiedQuotedTableName(oldTable)} TO " +
s"${getFullyQualifiedQuotedTableName(newTable)}"
}

override def getLimitClause(limit: Integer): String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc

import org.apache.spark.{SparkConf, SparkThrowable}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.util.Utils

class DerbyTableCatalogSuite extends QueryTest with SharedSparkSession {

val tempDir = Utils.createTempDir()
val url = s"jdbc:derby:memory:${tempDir.getCanonicalPath};create=true"
val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()

override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.catalog.derby", classOf[JDBCTableCatalog].getName)
.set("spark.sql.catalog.derby.url", url)
.set("spark.sql.catalog.derby.driver", "org.apache.derby.jdbc.EmbeddedDriver")
}

test("SPARK-42978: RENAME cannot qualify a new-table-Name with a schema-Name.") {
val n1t1 = "derby.test1.table1"
val n1t2 = "test1.table2"
val n2t2 = "test2.table2"

withTable(n1t1, n1t2) {
sql(s"CREATE TABLE $n1t1(c1 int)")
checkError(
exception = intercept[AnalysisException](
sql(s"ALTER TABLE $n1t1 RENAME TO $n2t2")).getCause.asInstanceOf[SparkThrowable],
errorClass = "CANNOT_RENAME_ACROSS_SCHEMA",
parameters = Map("type" -> "table"))
sql(s"ALTER TABLE $n1t1 RENAME TO $n1t2")
checkAnswer(sql(s"SHOW TABLES IN derby.test1"), Row("test1", "TABLE2", false))
}
}
}

0 comments on commit c8566f1

Please sign in to comment.