Skip to content

Commit

Permalink
[SPARK-12236][SQL] JDBC filter tests all pass if filters are not real…
Browse files Browse the repository at this point in the history
…ly pushed down

https://issues.apache.org/jira/browse/SPARK-12236
Currently JDBC filters are not tested properly. All the tests pass even if the filters are not pushed down due to Spark-side filtering.

In this PR,
Firstly, I corrected the tests to properly check the pushed down filters by removing Spark-side filtering.
Also, `!=` was being tested which is actually not pushed down. So I removed them.
Lastly, I moved the `stripSparkFilter()` function to `SQLTestUtils` as this functions would be shared for all tests for pushed down filters. This function would be also shared with ORC datasource as the filters for that are also not being tested properly.

Author: hyukjinkwon <[email protected]>

Closes #10221 from HyukjinKwon/SPARK-12236.
  • Loading branch information
HyukjinKwon authored and marmbrus committed Dec 16, 2015
1 parent 86ea64d commit 2811265
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}

/**
* Strip Spark-side filtering in order to check if a datasource filters rows correctly.
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val childRDD = df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()
.map(row => Row.fromSeq(row.toSeq(schema)))

sqlContext.createDataFrame(childRDD, schema)
}

test("filter pushdown - boolean") {
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
}

test("SELECT * WHERE (simple predicates)") {
assert(sql("SELECT * FROM foobar WHERE THEID < 1").collect().size === 0)
assert(sql("SELECT * FROM foobar WHERE THEID != 2").collect().size === 2)
assert(sql("SELECT * FROM foobar WHERE THEID = 1").collect().size === 1)
assert(sql("SELECT * FROM foobar WHERE NAME = 'fred'").collect().size === 1)
assert(sql("SELECT * FROM foobar WHERE NAME > 'fred'").collect().size === 2)
assert(sql("SELECT * FROM foobar WHERE NAME != 'fred'").collect().size === 2)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size === 0)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size === 1)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size === 1)
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size === 2)
}

test("SELECT * WHERE (quoted strings)") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ private[sql] trait SQLTestUtils
try f finally sqlContext.sql(s"USE default")
}

/**
* Strip Spark-side filtering in order to check if a datasource filters rows correctly.
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val childRDD = df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()
.map(row => Row.fromSeq(row.toSeq(schema)))

sqlContext.createDataFrame(childRDD, schema)
}

/**
* Turn a logical plan into a [[DataFrame]]. This should be removed once we have an easier
* way to construct [[DataFrame]] directly out of local data without relying on implicits.
Expand Down

0 comments on commit 2811265

Please sign in to comment.