Skip to content

Commit

Permalink
[SPARK-27580][SQL] Implement doCanonicalize in BatchScanExec for co…
Browse files Browse the repository at this point in the history
…mparing query plan results

## What changes were proposed in this pull request?

The method `QueryPlan.sameResult` is used for comparing logical plans in order to:
1. cache data in CacheManager
2. uncache data in CacheManager
3. Reuse subqueries
4. etc...

Currently the method `sameReuslt` always return false for `BatchScanExec`. We should fix it by implementing `doCanonicalize` for the node.

## How was this patch tested?

Unit test

Closes apache#24475 from gengliangwang/sameResultForV2.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gengliangwang authored and cloud-fan committed Apr 29, 2019
1 parent 05b85eb commit 07d07fe
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.sources.v2.reader._

/**
Expand All @@ -46,4 +47,8 @@ case class BatchScanExec(
override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
}

override def doCanonicalize(): BatchScanExec = {
this.copy(output = output.map(QueryPlan.normalizeExprId(_, output)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -85,6 +86,11 @@ abstract class FileScan(
override def readSchema(): StructType =
StructType(readDataSchema.fields ++ readPartitionSchema.fields)

// Returns whether the two given arrays of [[Filter]]s are equivalent.
protected def equivalentFilters(a: Array[Filter], b: Array[Filter]): Boolean = {
a.sortBy(_.hashCode()).sameElements(b.sortBy(_.hashCode()))
}

private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

private def normalizeName(name: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -34,7 +35,8 @@ case class OrcScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap)
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter])
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
override def isSplitable(path: Path): Boolean = true

Expand All @@ -46,4 +48,14 @@ case class OrcScan(
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema)
}

override def equals(obj: Any): Boolean = obj match {
case o: OrcScan =>
fileIndex == o.fileIndex && dataSchema == o.dataSchema &&
readDataSchema == o.readDataSchema && readPartitionSchema == o.readPartitionSchema &&
options == o.options && equivalentFilters(pushedFilters, o.pushedFilters)
case _ => false
}

override def hashCode(): Int = getClass.hashCode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class OrcScanBuilder(

override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema,
readDataSchema(), readPartitionSchema(), options)
readDataSchema(), readPartitionSchema(), options, pushedFilters())
}

private var _pushedFilters: Array[Filter] = Array.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.IntegerType

Expand All @@ -47,6 +49,65 @@ class SameResultSuite extends QueryTest with SharedSQLContext {
}
}

test("FileScan: different orders of data filters and partition filters") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
Seq("orc", "json", "csv").foreach { format =>
withTempPath { path =>
val tmpDir = path.getCanonicalPath
spark.range(10)
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
.write
.partitionBy("a", "b")
.format(format)
.option("header", true)
.save(tmpDir)
val df = spark.read.format(format).option("header", true).load(tmpDir)
// partition filters: a > 1 AND b < 9
// data filters: c > 1 AND d < 9
val plan1 = df.where("a > 1 AND b < 9 AND c > 1 AND d < 9").queryExecution.sparkPlan
val plan2 = df.where("b < 9 AND a > 1 AND d < 9 AND c > 1").queryExecution.sparkPlan
assert(plan1.sameResult(plan2))
val scan1 = getBatchScanExec(plan1)
val scan2 = getBatchScanExec(plan2)
assert(scan1.sameResult(scan2))
val plan3 = df.where("b < 9 AND a > 1 AND d < 8 AND c > 1").queryExecution.sparkPlan
assert(!plan1.sameResult(plan3))
// The [[FileScan]]s should have different results if they support filter pushdown.
if (format == "orc") {
val scan3 = getBatchScanExec(plan3)
assert(!scan1.sameResult(scan3))
}
}
}
}
}

test("TextScan") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
withTempPath { path =>
val tmpDir = path.getCanonicalPath
spark.range(10)
.selectExpr("id as a", "id + 1 as b", "cast(id as string) value")
.write
.partitionBy("a", "b")
.text(tmpDir)
val df = spark.read.text(tmpDir)
// partition filters: a > 1 AND b < 9
// data filters: c > 1 AND d < 9
val plan1 = df.where("a > 1 AND b < 9 AND value == '3'").queryExecution.sparkPlan
val plan2 = df.where("value == '3' AND a > 1 AND b < 9").queryExecution.sparkPlan
assert(plan1.sameResult(plan2))
val scan1 = getBatchScanExec(plan1)
val scan2 = getBatchScanExec(plan2)
assert(scan1.sameResult(scan2))
}
}
}

private def getBatchScanExec(plan: SparkPlan): BatchScanExec = {
plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec]
}

private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,19 +414,25 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}

test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(100)
.selectExpr("id", "id as b")
.write
.partitionBy("id")
.parquet(tempDir)
val df = spark.read.parquet(tempDir)
def getPlan(df: DataFrame): SparkPlan = {
df.queryExecution.executedPlan
Seq("orc", "").foreach { useV1ReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1ReaderList) {
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(100)
.selectExpr("id", "id as b")
.write
.partitionBy("id")
.orc(tempDir)
val df = spark.read.orc(tempDir)

def getPlan(df: DataFrame): SparkPlan = {
df.queryExecution.executedPlan
}

assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
}
}
assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
}
}

Expand Down

0 comments on commit 07d07fe

Please sign in to comment.