Skip to content

Commit

Permalink
[SPARK-24402][SQL] Optimize In expression when only one element in …
Browse files Browse the repository at this point in the history
…the collection or collection is empty

## What changes were proposed in this pull request?

Two new rules in the logical plan optimizers are added.

1. When there is only one element in the **`Collection`**, the
physical plan will be optimized to **`EqualTo`**, so predicate
pushdown can be used.

```scala
    profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
    """
      |== Physical Plan ==
      |*(1) Project [profileID#0]
      |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
      |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
      |     PartitionFilters: [],
      |     PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
      |     ReadSchema: struct<profileID:int>
    """.stripMargin
```

2. When the **`Collection`** is empty, and the input is nullable, the
logical plan will be simplified to

```scala
    profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
    """
      |== Optimized Logical Plan ==
      |Filter if (isnull(profileID#0)) null else false
      |+- Relation[profileID#0] parquet
    """.stripMargin
```

TODO:

1. For multiple conditions with numbers less than certain thresholds,
we should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`**
when the numbers of the categories are low, and they are **`Int`**,
**`Long`**.
3. The default immutable hash trees set is slow for query, and we
should do benchmark for using different set implementation for faster
query.
4. **`filter(if (condition) null else false)`** can be optimized to false.

## How was this patch tested?

Couple new tests are added.

Author: DB Tsai <[email protected]>

Closes apache#21797 from dbtsai/optimize-in.
  • Loading branch information
dbtsai committed Jul 18, 2018
1 parent 2a4dd6f commit 681845f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,24 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] {
object OptimizeIn extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
case In(v, list) if list.isEmpty =>
// When v is not nullable, the following expression will be optimized
// to FalseLiteral which is tested in OptimizeInSuite.scala
If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1
// TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed,
// TODO: we exclude them in this rule.
&& !v.isInstanceOf[CreateNamedStructLike]
&& !newList.head.isInstanceOf[CreateNamedStructLike]) {
EqualTo(v, newList.head)
} else if (newList.length > SQLConf.get.optimizerInSetConversionThreshold) {
val hSet = newList.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
} else if (newList.size < list.size) {
} else if (newList.length < list.length) {
expr.copy(list = newList)
} else { // newList.length == list.length
} else { // newList.length == list.length && newList.length > 1
expr
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ class OptimizeInSuite extends PlanTest {
}
}

test("OptimizedIn test: one element in list gets transformed to EqualTo.") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1))))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(EqualTo(UnresolvedAttribute("a"), Literal(1)))
.analyze

comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In empty list gets transformed to FalseLiteral " +
"when value is not nullable") {
val originalQuery =
Expand All @@ -191,4 +206,21 @@ class OptimizeInSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In empty list gets transformed to `If` expression " +
"when value is nullable") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Nil))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(If(IsNotNull(UnresolvedAttribute("a")),
Literal(false), Literal.create(null, BooleanType)))
.analyze

comparePlans(optimized, correctAnswer)
}
}

0 comments on commit 681845f

Please sign in to comment.