diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index da71e7873b525..9ad683fbe1df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} +import org.apache.spark.util.Utils trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { @@ -35,7 +36,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { def readerFactory: PartitionReaderFactory override def simpleString(maxFields: Int): String = { - s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" + val result = + s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" + Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result) } override def outputPartitioning: physical.Partitioning = scan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index d60d4292090d6..bcb10ae5999fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.execution.datasources.v2 import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils /** * A base interface for data source v2 implementations of the built-in file-based data sources. @@ -49,6 +51,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { } protected def getTableName(paths: Seq[String]): String = { - shortName() + ":" + paths.mkString(";") + val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",") + Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name) + } + + private def qualifiedPathName(path: String): String = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 84a1274ea4f9c..b2f3c4d256448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.{Locale, OptionalLong} +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -29,6 +30,7 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils abstract class FileScan( sparkSession: SparkSession, @@ -42,6 +44,21 @@ abstract class FileScan( false } + override def description(): String = { + val locationDesc = + fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") + val metadata: Map[String, String] = Map( + "ReadSchema" -> readDataSchema.catalogString, + "Location" -> locationDesc) + val metadataStr = metadata.toSeq.sorted.map { + case (key, value) => + val redactedValue = + Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) + key + ": " + StringUtils.abbreviate(redactedValue, 100) + }.mkString(", ") + s"${this.getClass.getSimpleName} $metadataStr" + } + protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index b129c942ccc53..a4fb03405d162 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -58,4 +58,8 @@ case class OrcScan( } override def hashCode(): Int = getClass.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index 11a1c9a1f9b9c..ec59459764ce6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -19,26 +19,35 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext /** - * Suite that tests the redaction of DataSourceScanExec + * Test suite base for testing the redaction of DataSourceScanExec/BatchScanExec. */ -class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { +abstract class DataSourceScanRedactionTest extends QueryTest with SharedSQLContext { override protected def sparkConf: SparkConf = super.sparkConf - .set("spark.redaction.string.regex", "file:/[\\w_]+") + .set("spark.redaction.string.regex", "file:/[\\w-_@/]+") + + final protected def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = { + queryExecution.toString.contains(msg) || + queryExecution.simpleString.contains(msg) || + queryExecution.stringWithStats.contains(msg) + } + + protected def getRootPath(df: DataFrame): Path test("treeString is redacted") { withTempDir { dir => val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - val df = spark.read.parquet(basePath) + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) - val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head + val rootPath = getRootPath(df) assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/"))) assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName)) @@ -53,18 +62,24 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { assert(df.queryExecution.simpleString.contains(replacement)) } } +} - private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = { - queryExecution.toString.contains(msg) || - queryExecution.simpleString.contains(msg) || - queryExecution.stringWithStats.contains(msg) - } +/** + * Suite that tests the redaction of DataSourceScanExec + */ +class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc") + + override protected def getRootPath(df: DataFrame): Path = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head test("explain is redacted using SQLConf") { withTempDir { dir => val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - val df = spark.read.parquet(basePath) + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) val replacement = "*********" // Respect SparkConf and replace file:/ @@ -86,8 +101,8 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { test("FileSourceScanExec metadata") { withTempPath { path => val dir = path.getCanonicalPath - spark.range(0, 10).write.parquet(dir) - val df = spark.read.parquet(dir) + spark.range(0, 10).write.orc(dir) + val df = spark.read.orc(dir) assert(isIncluded(df.queryExecution, "Format")) assert(isIncluded(df.queryExecution, "ReadSchema")) @@ -98,5 +113,52 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { assert(isIncluded(df.queryExecution, "Location")) } } +} + +/** + * Suite that tests the redaction of BatchScanExec. + */ +class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest { + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "") + + override protected def getRootPath(df: DataFrame): Path = + df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + .asInstanceOf[BatchScanExec].scan.asInstanceOf[OrcScan].fileIndex.rootPaths.head + + test("explain is redacted using SQLConf") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) + val replacement = "*********" + + // Respect SparkConf and replace file:/ + assert(isIncluded(df.queryExecution, replacement)) + assert(isIncluded(df.queryExecution, "BatchScan")) + assert(!isIncluded(df.queryExecution, "file:/")) + + withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)BatchScan") { + // Respect SQLConf and replace FileScan + assert(isIncluded(df.queryExecution, replacement)) + + assert(!isIncluded(df.queryExecution, "BatchScan")) + assert(isIncluded(df.queryExecution, "file:/")) + } + } + } + + test("FileScan description") { + withTempPath { path => + val dir = path.getCanonicalPath + spark.range(0, 10).write.orc(dir) + val df = spark.read.orc(dir) + + assert(isIncluded(df.queryExecution, "ReadSchema")) + assert(isIncluded(df.queryExecution, "BatchScan")) + assert(isIncluded(df.queryExecution, "PushedFilters")) + assert(isIncluded(df.queryExecution, "Location")) + } + } }