Skip to content

Commit

Permalink
[SPARK-49359][SQL] Allow StagedTableCatalog implementations to fall b…
Browse files Browse the repository at this point in the history
…ack to non-atomic write

### What changes were proposed in this pull request?

This PR allows `StagedTableCatalog#create/replaceTable` to return null and Spark will fall back to normal non-atomic write.

### Why are the changes needed?

Extending an interface is static but sometimes the implementations need more dynamicity. For example, a catalog may only support atomic CTAS for certain table formats, and we shouldn't force them to implement atomic writes for all other formats.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47848 from cloud-fan/stage.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
cloud-fan authored and HyukjinKwon committed Aug 23, 2024
1 parent 08d69ff commit 97e0a88
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ StagedTable stageCreate(
* @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
Expand Down Expand Up @@ -128,7 +129,8 @@ StagedTable stageReplace(
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws NoSuchTableException If the table does not exist
Expand Down Expand Up @@ -176,7 +178,8 @@ StagedTable stageCreateOrReplace(
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @return metadata for the new table. This can be null if the catalog does not support atomic
* creation for this table. Spark will call {@link #loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ case class AtomicCreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val stagedTable = catalog.stageCreate(
val stagedTable = Option(catalog.stageCreate(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
).getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
writeToTable(catalog, stagedTable, writeOptions, ident, query)
}
}
Expand Down Expand Up @@ -216,7 +217,9 @@ case class AtomicReplaceTableAsSelectExec(
} else {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
writeToTable(catalog, staged, writeOptions, ident, query)
val table = Option(staged).getOrElse(
catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
writeToTable(catalog, table, writeOptions, ident, query)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3770,6 +3770,19 @@ class DataSourceV2SQLSuiteV1Filter
checkWriteOperations("read_only_cat")
}

test("StagingTableCatalog without atomic support") {
withSQLConf("spark.sql.catalog.fakeStagedCat" -> classOf[FakeStagedTableCatalog].getName) {
withTable("fakeStagedCat.t") {
sql("CREATE TABLE fakeStagedCat.t AS SELECT 1 col")
checkAnswer(spark.table("fakeStagedCat.t"), Row(1))
sql("REPLACE TABLE fakeStagedCat.t AS SELECT 2 col")
checkAnswer(spark.table("fakeStagedCat.t"), Row(2))
sql("CREATE OR REPLACE TABLE fakeStagedCat.t AS SELECT 1 c1, 2 c2")
checkAnswer(spark.table("fakeStagedCat.t"), Row(1, 2))
}
}
}

private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
Expand Down Expand Up @@ -3855,3 +3868,62 @@ class ReadOnlyCatalog extends InMemoryCatalog {
writePrivileges.asScala.toSeq.map(_.toString).sorted.mkString(","))
}
}

class FakeStagedTableCatalog extends InMemoryCatalog with StagingTableCatalog {
override def stageCreate(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
throw new RuntimeException("shouldn't be called")
}

override def stageCreate(
ident: Identifier,
columns: Array[ColumnV2],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
super.createTable(ident, columns, partitions, properties)
null
}

override def stageReplace(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
throw new RuntimeException("shouldn't be called")
}

override def stageReplace(
ident: Identifier,
columns: Array[ColumnV2],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
super.dropTable(ident)
super.createTable(ident, columns, partitions, properties)
null
}

override def stageCreateOrReplace(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
throw new RuntimeException("shouldn't be called")
}

override def stageCreateOrReplace(
ident: Identifier,
columns: Array[ColumnV2],
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
try {
super.dropTable(ident)
} catch {
case _: Throwable =>
}
super.createTable(ident, columns, partitions, properties)
null
}
}

0 comments on commit 97e0a88

Please sign in to comment.