Skip to content

Commit

Permalink
[SPARK-11661][SQL] Still pushdown filters returned by unhandledFilters.
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-11661

Author: Yin Huai <[email protected]>

Closes apache#9634 from yhuai/unhandledFilters.
  • Loading branch information
yhuai authored and liancheng committed Nov 12, 2015
1 parent e2957bc commit 14cf753
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
*
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst
* predicate [[Expression]]s that are either not convertible or cannot be handled by
* `relation`. The second element contains all converted data source [[Filter]]s that can
* be handled by `relation`.
* `relation`. The second element contains all converted data source [[Filter]]s that
* will be pushed down to the data source.
*/
protected[sql] def selectFilters(
relation: BaseRelation,
Expand All @@ -476,7 +476,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Catalyst predicate expressions that cannot be translated to data source filters.
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)

// Data source filters that cannot be handled by `relation`
// Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
// at here is that a data source may not be able to apply this filter to every row
// of the underlying dataset.
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet

val (unhandled, handled) = translated.partition {
Expand All @@ -491,6 +493,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Translated data source filters that can be handled by `relation`
val (_, handledFilters) = handled.unzip

(unrecognizedPredicates ++ unhandledPredicates, handledFilters)
// translated contains all filters that have been converted to the public Filter interface.
// We should always push them to the data source no matter whether the data source can apply
// a filter to every row or not.
val (_, translatedFilters) = translated.unzip

(unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ abstract class BaseRelation {
def needConversion: Boolean = true

/**
* Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation
* cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this
* data source relation.
* Returns the list of [[Filter]]s that this datasource may not be able to handle.
* These returned [[Filter]]s will be evaluated by Spark SQL after data is output by a scan.
* By default, this function will return all filters, as it is always safe to
* double evaluate a [[Filter]]. However, specific implementations can override this function to
* avoid double filtering when they are capable of processing a filter internally.
*
* @since 1.6.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,29 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
val df = sqlContext.read.parquet(path).filter("a = 2")

// This is the source RDD without Spark-side filtering.
val childRDD =
df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()

// The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
assert(childRDD.count == 1)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
testPushDown("SELECT * FROM oneToTenFiltered WHERE a IN (1,3,5)", 3, Set("a", "b", "c"))

testPushDown("SELECT * FROM oneToTenFiltered WHERE a = 20", 0, Set("a", "b", "c"))
testPushDown("SELECT * FROM oneToTenFiltered WHERE b = 1", 10, Set("a", "b", "c"))
testPushDown(
"SELECT * FROM oneToTenFiltered WHERE b = 1",
10,
Set("a", "b", "c"),
Set(EqualTo("b", 1)))

testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 5 AND a > 1", 3, Set("a", "b", "c"))
testPushDown("SELECT * FROM oneToTenFiltered WHERE a < 3 OR a > 8", 4, Set("a", "b", "c"))
Expand Down Expand Up @@ -283,12 +287,23 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
| WHERE a + b > 9
| AND b < 16
| AND c IN ('bbbbbBBBBB', 'cccccCCCCC', 'dddddDDDDD', 'foo')
""".stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))
""".stripMargin.split("\n").map(_.trim).mkString(" "),
3,
Set("a", "b"),
Set(LessThan("b", 16)))

def testPushDown(
sqlString: String,
expectedCount: Int,
requiredColumnNames: Set[String]): Unit = {
sqlString: String,
expectedCount: Int,
requiredColumnNames: Set[String]): Unit = {
testPushDown(sqlString, expectedCount, requiredColumnNames, Set.empty[Filter])
}

def testPushDown(
sqlString: String,
expectedCount: Int,
requiredColumnNames: Set[String],
expectedUnhandledFilters: Set[Filter]): Unit = {
test(s"PushDown Returns $expectedCount: $sqlString") {
val queryExecution = sql(sqlString).queryExecution
val rawPlan = queryExecution.executedPlan.collect {
Expand All @@ -300,15 +315,13 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
val rawCount = rawPlan.execute().count()
assert(ColumnsRequired.set === requiredColumnNames)

assert {
val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _) => r
}.get
val table = caseInsensitiveContext.table("oneToTenFiltered")
val relation = table.queryExecution.logical.collectFirst {
case LogicalRelation(r, _) => r
}.get

// `relation` should be able to handle all pushed filters
relation.unhandledFilters(FiltersPushed.list.toArray).isEmpty
}
assert(
relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters)

if (rawCount != expectedCount) {
fail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('c, 'p),
filter = 'a < 3 && 'p > 0,
requiredColumns = Seq("c", "a"),
pushedFilters = Nil,
pushedFilters = Seq(LessThan("a", 3)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('a < 3),
partitioningFilters = Seq('p > 0)
Expand Down Expand Up @@ -327,7 +327,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'c > "val_7" && 'b < 18 && 'p > 0,
requiredColumns = Seq("b"),
pushedFilters = Seq(GreaterThan("c", "val_7")),
pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('b < 18),
partitioningFilters = Seq('p > 0)
Expand All @@ -344,7 +344,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0,
requiredColumns = Seq("b", "a"),
pushedFilters = Seq(GreaterThan("c", "val_7")),
pushedFilters = Seq(GreaterThan("c", "val_7"), LessThan("b", 18)),
inconvertibleFilters = Seq('a % 2 === 0),
unhandledFilters = Seq('b < 18),
partitioningFilters = Seq('p > 0)
Expand All @@ -361,7 +361,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
projections = Seq('b, 'p),
filter = 'a > 7 && 'a < 9,
requiredColumns = Seq("b", "a"),
pushedFilters = Seq(GreaterThan("a", 7)),
pushedFilters = Seq(GreaterThan("a", 7), LessThan("a", 9)),
inconvertibleFilters = Nil,
unhandledFilters = Seq('a < 9),
partitioningFilters = Nil
Expand Down

0 comments on commit 14cf753

Please sign in to comment.