Skip to content

Commit

Permalink
[SPARK-25559][SQL] Remove the unsupported predicates in Parquet when …
Browse files Browse the repository at this point in the history
…possible

## What changes were proposed in this pull request?

Currently, in `ParquetFilters`, if one of the children predicates is not supported by Parquet, the entire predicates will be thrown away. In fact, if the unsupported predicate is in the top level `And` condition or in the child before hitting `Not` or `Or` condition, it can be safely removed.

## How was this patch tested?

Tests are added.

Closes apache#22574 from dbtsai/removeUnsupportedPredicatesInParquet.

Lead-authored-by: DB Tsai <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: DB Tsai <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
3 people committed Sep 29, 2018
1 parent 9362c5c commit 5d726b8
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,13 @@ private[parquet] class ParquetFilters(
*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToParquetField = getFieldMap(schema)
createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
}

private def createFilterHelper(
nameToParquetField: Map[String, ParquetField],
predicate: sources.Filter,
canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
Expand Down Expand Up @@ -488,26 +494,36 @@ private[parquet] class ParquetFilters(
.map(_(nameToParquetField(name).fieldName, value))

case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// At here, it is not safe to just convert one side and remove the other side
// if we do not understand what the parent filters are.
//
// Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
} yield FilterApi.and(lhsFilter, rhsFilter)
//
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)

(lhsFilterOption, rhsFilterOption) match {
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
case _ => None
}

case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not)
createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
.map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name, values.head)
&& values.distinct.length <= pushDownInFilterThreshold =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false),
StructField("b", StringType, nullable = true),
Expand All @@ -770,14 +770,95 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("c", 1.5D)))
}

assertResult(None) {
// Testing when `canRemoveOneSideInAnd == true`
// case sources.And(lhs, rhs) =>
// ...
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
sources.And(
sources.LessThan("a", 10),
sources.StringContains("b", "prefix")))
}

// Testing when `canRemoveOneSideInAnd == true`
// case sources.And(lhs, rhs) =>
// ...
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
assertResult(Some(lt(intColumn("a"), 10: Integer))) {
parquetFilters.createFilter(
parquetSchema,
sources.And(
sources.StringContains("b", "prefix"),
sources.LessThan("a", 10)))
}

// Testing complex And conditions
assertResult(Some(
FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) {
parquetFilters.createFilter(
parquetSchema,
sources.And(
sources.And(
sources.LessThan("a", 10),
sources.StringContains("b", "prefix")
),
sources.GreaterThan("a", 5)))
}

// Testing complex And conditions
assertResult(Some(
FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) {
parquetFilters.createFilter(
parquetSchema,
sources.And(
sources.GreaterThan("a", 5),
sources.And(
sources.StringContains("b", "prefix"),
sources.LessThan("a", 10)
)))
}

// Testing
// case sources.Or(lhs, rhs) =>
// ...
// lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Or(
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix")),
sources.GreaterThan("a", 2)))
}

// Testing
// case sources.Or(lhs, rhs) =>
// ...
// rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Or(
sources.GreaterThan("a", 2),
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing when `canRemoveOneSideInAnd == false`
// case sources.And(lhs, rhs) =>
// ...
// case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
Expand All @@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing when `canRemoveOneSideInAnd == false`
// case sources.And(lhs, rhs) =>
// ...
// case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.And(
sources.StringContains("b", "prefix"),
sources.GreaterThan("a", 1))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing passing `canRemoveOneSideInAnd = false` into
// case sources.And(lhs, rhs) =>
// val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.And(
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix")),
sources.GreaterThan("a", 2))))
}

// Testing
// case sources.Not(pred) =>
// createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
// .map(FilterApi.not)
//
// and
//
// Testing passing `canRemoveOneSideInAnd = false` into
// case sources.And(lhs, rhs) =>
// val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
assertResult(None) {
parquetFilters.createFilter(
parquetSchema,
sources.Not(
sources.And(
sources.GreaterThan("a", 2),
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix")))))
}
}

test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {
Expand Down

0 comments on commit 5d726b8

Please sign in to comment.