Skip to content

Commit

Permalink
[SPARK-35273][SQL] CombineFilters support non-deterministic expressions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr makes `CombineFilters` support non-deterministic expressions. For example:
```sql
spark.sql("CREATE TABLE t1(id INT, dt STRING) using parquet PARTITIONED BY (dt)")
spark.sql("CREATE VIEW v1 AS SELECT * FROM t1 WHERE dt NOT IN ('2020-01-01', '2021-01-01')")
spark.sql("SELECT * FROM v1 WHERE dt = '2021-05-01' AND rand() <= 0.01").explain()
```

Before this pr:
```
== Physical Plan ==
*(1) Filter (isnotnull(dt#1) AND ((dt#1 = 2021-05-01) AND (rand(-6723800298719475098) <= 0.01)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t1[id#0,dt#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [NOT dt#1 IN (2020-01-01,2021-01-01)], PushedFilters: [], ReadSchema: struct<id:int>
```

After this pr:
```
== Physical Plan ==
*(1) Filter (rand(-2400509328955813273) <= 0.01)
+- *(1) ColumnarToRow
   +- FileScan parquet default.t1[id#0,dt#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(dt#1), NOT dt#1 IN (2020-01-01,2021-01-01), (dt#1 = 2021-05-01)], PushedFilters: [], ReadSchema: struct<id:int>
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#32405 from wangyum/SPARK-35273.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
wangyum authored and cloud-fan committed May 1, 2021
1 parent 4e8701a commit 72e238a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1174,14 +1174,17 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// The query execution/optimization does not guarantee the expressions are evaluated in order.
// We only can combine them if and only if both are deterministic.
case Filter(fc, nf @ Filter(nc, grandChild)) if fc.deterministic && nc.deterministic =>
(ExpressionSet(splitConjunctivePredicates(fc)) --
case Filter(fc, nf @ Filter(nc, grandChild)) if nc.deterministic =>
val (combineCandidates, nonDeterministic) =
splitConjunctivePredicates(fc).partition(_.deterministic)
val mergedFilter = (ExpressionSet(combineCandidates) --
ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match {
case Some(ac) =>
Filter(And(nc, ac), grandChild)
case None =>
nf
}
nonDeterministic.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,18 @@ class PruneFiltersSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-35273: CombineFilters support non-deterministic expressions") {
val x = testRelation.where(!'a.attr.in(1, 3, 5)).subquery('x)

comparePlans(
Optimize.execute(x.where('a.attr === 7 && Rand(10) > 0.1).analyze),
testRelation.where(!'a.attr.in(1, 3, 5) && 'a.attr === 7).where(Rand(10) > 0.1).analyze)

comparePlans(
Optimize.execute(
x.where('a.attr === 7 && Rand(10) > 0.1 && 'b.attr === 1 && Rand(10) < 1.1).analyze),
testRelation.where(!'a.attr.in(1, 3, 5) && 'a.attr === 7 && 'b.attr === 1)
.where(Rand(10) > 0.1 && Rand(10) < 1.1).analyze)
}
}

0 comments on commit 72e238a

Please sign in to comment.