Skip to content

Commit

Permalink
[SPARK-13527][SQL] Prune Filters based on Constraints
Browse files Browse the repository at this point in the history
#### What changes were proposed in this pull request?

Remove all the deterministic conditions in a [[Filter]] that are contained in the Child's Constraints.

For example, the first query can be simplified to the second one.

```scala
    val queryWithUselessFilter = tr1
      .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
      .where(
        ("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
        'd.attr < 100 &&
        "tr2.a".attr === "tr1.a".attr)
```
```scala
    val query = tr1
      .where("tr1.a".attr > 10 || "tr1.c".attr < 10)
      .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
```
#### How was this patch tested?

Six test cases are added.

Author: gatorsmile <[email protected]>

Closes apache#11406 from gatorsmile/FilterRemoval.
  • Loading branch information
gatorsmile authored and marmbrus committed Mar 9, 2016
1 parent 3dc9ae2 commit c6aa356
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyFilters,
PruneFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
Expand Down Expand Up @@ -827,18 +827,34 @@ object CombineFilters extends Rule[LogicalPlan] {
}

/**
* Removes filters that can be evaluated trivially. This is done either by eliding the filter for
* cases where it will always evaluate to `true`, or substituting a dummy empty relation when the
* filter will always evaluate to `false`.
* Removes filters that can be evaluated trivially. This can be done through the following ways:
* 1) by eliding the filter for cases where it will always evaluate to `true`.
* 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
* 3) by eliminating the always-true conditions given the constraints on the child's output.
*/
object SimplifyFilters extends Rule[LogicalPlan] {
object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the filter condition always evaluate to true, remove the filter.
case Filter(Literal(true, BooleanType), child) => child
// If the filter condition always evaluate to null or false,
// replace the input with an empty relation.
case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty)
case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty)
// If any deterministic condition is guaranteed to be true given the constraints on the child's
// output, remove the condition
case f @ Filter(fc, p: LogicalPlan) =>
val (prunedPredicates, remainingPredicates) =
splitConjunctivePredicates(fc).partition { cond =>
cond.deterministic && p.constraints.contains(cond)
}
if (prunedPredicates.isEmpty) {
f
} else if (remainingPredicates.isEmpty) {
p
} else {
val newCond = remainingPredicates.reduce(And)
Filter(newCond, p)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyFilters) :: Nil
PruneFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class PruneFiltersSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Filter Pushdown and Pruning", Once,
CombineFilters,
PruneFilters,
PushPredicateThroughProject,
PushPredicateThroughJoin) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("Constraints of isNull + LeftOuter") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
val queryWithUselessFilter = query.where("x.b".attr.isNull)

val optimized = Optimize.execute(queryWithUselessFilter.analyze)
val correctAnswer = query.analyze

comparePlans(optimized, correctAnswer)
}

test("Constraints of unionall") {
val tr1 = LocalRelation('a.int, 'b.int, 'c.int)
val tr2 = LocalRelation('d.int, 'e.int, 'f.int)
val tr3 = LocalRelation('g.int, 'h.int, 'i.int)

val query =
tr1.where('a.attr > 10)
.unionAll(tr2.where('d.attr > 10)
.unionAll(tr3.where('g.attr > 10)))
val queryWithUselessFilter = query.where('a.attr > 10)

val optimized = Optimize.execute(queryWithUselessFilter.analyze)
val correctAnswer = query.analyze

comparePlans(optimized, correctAnswer)
}

test("Pruning multiple constraints in the same run") {
val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)

val query = tr1
.where("tr1.a".attr > 10 || "tr1.c".attr < 10)
.join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr))
// different order of "tr2.a" and "tr1.a"
val queryWithUselessFilter =
query.where(
("tr1.a".attr > 10 || "tr1.c".attr < 10) &&
'd.attr < 100 &&
"tr2.a".attr === "tr1.a".attr)

val optimized = Optimize.execute(queryWithUselessFilter.analyze)
val correctAnswer = query.analyze

comparePlans(optimized, correctAnswer)
}

test("Partial pruning") {
val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1)
val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2)

// One of the filter condition does not exist in the constraints of its child
// Thus, the filter is not removed
val query = tr1
.where("tr1.a".attr > 10)
.join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr))
val queryWithExtraFilters =
query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr)

val optimized = Optimize.execute(queryWithExtraFilters.analyze)
val correctAnswer = tr1
.where("tr1.a".attr > 10)
.join(tr2.where('d.attr < 100),
Inner,
Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("No predicate is pruned") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val query = x.where("x.b".attr.isNull).join(y, LeftOuter)
val queryWithExtraFilters = query.where("x.b".attr.isNotNull)

val optimized = Optimize.execute(queryWithExtraFilters.analyze)
val correctAnswer =
testRelation.where("b".attr.isNull).where("b".attr.isNotNull)
.join(testRelation, LeftOuter).analyze

comparePlans(optimized, correctAnswer)
}

test("Nondeterministic predicate is not pruned") {
val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze
comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest {
Batch("Union Pushdown", Once,
CombineUnions,
SetOperationPushDown,
SimplifyFilters) :: Nil
PruneFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private[sql] object ParquetFilters {
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
// which can be casted to `false` implicitly. Please refer to the `eval` method of these
// operators and the `SimplifyFilters` rule for details.
// operators and the `PruneFilters` rule for details.

// Hyukjin:
// I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].
Expand Down

0 comments on commit c6aa356

Please sign in to comment.