Skip to content

Commit

Permalink
[SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a corr…
Browse files Browse the repository at this point in the history
…ectness issue in the case of overlapping partition and data columns

### What changes were proposed in this pull request?

This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap.

This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields.

The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0.

In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter.

Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.

The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`.

### Why are the changes needed?

Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1.

Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix.

Closes apache#37419 from sadikovi/SPARK-39833.

Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
sadikovi authored and HyukjinKwon committed Aug 21, 2022
1 parent e69b2df commit cde71aa
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ class ParquetFileFormat
SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetTimestampNTZEnabled)

// See PARQUET-2170.
// Disable column index optimisation when required schema does not have columns that appear in
// pushed filters to avoid getting incorrect results.
hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false)

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
}
}
}

test("SPARK-39833: pushed filters with count()") {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq(0).toDF("COL").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
checkAnswer(df.filter("col = 0"), Seq(Row(0)))
assert(df.filter("col = 0").count() == 1, "col")
assert(df.filter("COL = 0").count() == 1, "COL")
}
}

test("SPARK-39833: pushed filters with project without filter columns") {
withTempPath { path =>
val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
val df = spark.read.parquet(path.getCanonicalPath)
checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == Row(1) :: Nil)
}
}
}

class ParquetV2QuerySuite extends ParquetQuerySuite {
Expand Down

0 comments on commit cde71aa

Please sign in to comment.