Skip to content

Commit

Permalink
[SPARK-33623][SQL] Add canDeleteWhere to SupportsDelete
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR provides us with a way to check if a data source is going to reject the delete via `deleteWhere` at planning time.

### Why are the changes needed?

The only way to support delete statements right now is to implement ``SupportsDelete``. According to its Javadoc, that interface is meant for cases when we can delete data without much effort (e.g. like deleting a complete partition in a Hive table).

This PR actually provides us with a way to check if a data source is going to reject the delete via `deleteWhere` at planning time instead of just getting an exception during execution. In the future, we can use this functionality to decide whether Spark should rewrite this delete and execute a distributed query or it can just pass a set of filters.

Consider an example of a partitioned Hive table. If we have a delete predicate like `part_col = '2020'`, we can just drop the matching partition to satisfy this delete. In this case, the data source should return `true` from `canDeleteWhere` and use the filters it accepts in `deleteWhere` to drop the partition. I consider this as a delete without significant effort. At the same time, if we have a delete predicate like `id = 10`, Hive tables would not be able to execute this delete using a metadata only operation without rewriting files. In that case, the data source should return `false` from `canDeleteWhere` and we should use a more sophisticated row-level API to find out which records should be removed (the API is yet to be discussed, but we need this PR as a basis).

If we decide to support subqueries and all delete use cases by simply extending the existing API, this will mean all data sources will have to implement a lot of Spark logic to determine which records changed. I don't think we want to go that way as the Spark logic to determine which records should be deleted is independent of the underlying data source. So the assumption is that Spark will execute a plan to find which records must be deleted for data sources that return `false` from `canDeleteWhere`.
### Does this PR introduce _any_ user-facing change?

Yes but it is backward compatible.

### How was this patch tested?

This PR comes with a new test.

Closes apache#30562 from aokolnychyi/spark-33623.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
aokolnychyi authored and dongjoon-hyun committed Dec 3, 2020
1 parent bd71186 commit aa13e20
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,30 @@
*/
@Evolving
public interface SupportsDelete {

/**
* Checks whether it is possible to delete data from a data source table that matches filter
* expressions.
* <p>
* Rows should be deleted from the data source iff all of the filter expressions match.
* That is, the expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Spark will call this method at planning time to check whether {@link #deleteWhere(Filter[])}
* would reject the delete operation because it requires significant effort. If this method
* returns false, Spark will not call {@link #deleteWhere(Filter[])} and will try to rewrite
* the delete operation and produce row-level changes if the data source table supports deleting
* individual records.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @return true if the delete operation can be performed
*/
default boolean canDeleteWhere(Filter[] filters) {
return true;
}

/**
* Delete data from a data source table that matches filter expressions.
* Delete data from a data source table that matches filter expressions. Note that this method
* will be invoked only if {@link #canDeleteWhere(Filter[])} returns true.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class InMemoryTable(
}
}

override def canDeleteWhere(filters: Array[Filter]): Boolean = {
InMemoryTable.supportsFilters(filters)
}

override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted), filters)
Expand All @@ -360,6 +364,14 @@ object InMemoryTable {
}
}

def supportsFilters(filters: Array[Filter]): Boolean = {
filters.flatMap(splitAnd).forall {
case _: EqualTo => true
case _: IsNotNull => true
case _ => false
}
}

private def extractValue(
attr: String,
partFieldNames: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
}).toArray

if (!table.asDeletable.canDeleteWhere(filters)) {
throw new AnalysisException(
s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}

DeleteFromTableExec(table.asDeletable, filters) :: Nil
case _ =>
throw new AnalysisException("DELETE is only supported with v2 tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,20 @@ class DataSourceV2SQLSuite
}
}

test("DeleteFrom: delete with unsupported predicates") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id > 3 AND p > 3")
}

assert(spark.table(t).count === 3)
assert(exc.getMessage.contains(s"Cannot delete from table $t"))
}
}

test("DeleteFrom: DELETE is only supported with v2 tables") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
Expand Down

0 comments on commit aa13e20

Please sign in to comment.