Skip to content

Commit

Permalink
[SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2Sca…
Browse files Browse the repository at this point in the history
…nExecBase

## What changes were proposed in this pull request?

To follow apache#17397, the output of FileTable and DataSourceV2ScanExecBase can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non-privileged users.

This PR is to add a redaction facility for these outputs to resolve the issue. A user can enable this by setting a regex in the same spark.redaction.string.regex configuration as V1.
## How was this patch tested?

Unit test

Closes apache#24719 from gengliangwang/RedactionSuite.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
gengliangwang authored and dongjoon-hyun committed May 29, 2019
1 parent bfa7f11 commit c1007c2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
import org.apache.spark.util.Utils

trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {

Expand All @@ -35,7 +36,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
def readerFactory: PartitionReaderFactory

override def simpleString(maxFields: Int): String = {
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
val result =
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result)
}

override def outputPartitioning: physical.Partitioning = scan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.apache.spark.sql.execution.datasources.v2

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
Expand All @@ -49,6 +51,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
}

protected def getTableName(paths: Seq[String]): String = {
shortName() + ":" + paths.mkString(";")
val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
}

private def qualifiedPathName(path: String): String = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2

import java.util.{Locale, OptionalLong}

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

abstract class FileScan(
sparkSession: SparkSession,
Expand All @@ -42,6 +44,21 @@ abstract class FileScan(
false
}

override def description(): String = {
val locationDesc =
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
val metadata: Map[String, String] = Map(
"ReadSchema" -> readDataSchema.catalogString,
"Location" -> locationDesc)
val metadataStr = metadata.toSeq.sorted.map {
case (key, value) =>
val redactedValue =
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
key + ": " + StringUtils.abbreviate(redactedValue, 100)
}.mkString(", ")
s"${this.getClass.getSimpleName} $metadataStr"
}

protected def partitions: Seq[FilePartition] = {
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ case class OrcScan(
}

override def hashCode(): Int = getClass.hashCode()

override def description(): String = {
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,35 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

/**
* Suite that tests the redaction of DataSourceScanExec
* Test suite base for testing the redaction of DataSourceScanExec/BatchScanExec.
*/
class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
abstract class DataSourceScanRedactionTest extends QueryTest with SharedSQLContext {

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.redaction.string.regex", "file:/[\\w_]+")
.set("spark.redaction.string.regex", "file:/[\\w-_@/]+")

final protected def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = {
queryExecution.toString.contains(msg) ||
queryExecution.simpleString.contains(msg) ||
queryExecution.stringWithStats.contains(msg)
}

protected def getRootPath(df: DataFrame): Path

test("treeString is redacted") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
val df = spark.read.parquet(basePath)
spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
val df = spark.read.orc(basePath)

val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
val rootPath = getRootPath(df)
assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/")))

assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
Expand All @@ -53,18 +62,24 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.simpleString.contains(replacement))
}
}
}

private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = {
queryExecution.toString.contains(msg) ||
queryExecution.simpleString.contains(msg) ||
queryExecution.stringWithStats.contains(msg)
}
/**
* Suite that tests the redaction of DataSourceScanExec
*/
class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
override protected def sparkConf: SparkConf = super.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc")

override protected def getRootPath(df: DataFrame): Path =
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
.asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head

test("explain is redacted using SQLConf") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
val df = spark.read.parquet(basePath)
spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
val df = spark.read.orc(basePath)
val replacement = "*********"

// Respect SparkConf and replace file:/
Expand All @@ -86,8 +101,8 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
test("FileSourceScanExec metadata") {
withTempPath { path =>
val dir = path.getCanonicalPath
spark.range(0, 10).write.parquet(dir)
val df = spark.read.parquet(dir)
spark.range(0, 10).write.orc(dir)
val df = spark.read.orc(dir)

assert(isIncluded(df.queryExecution, "Format"))
assert(isIncluded(df.queryExecution, "ReadSchema"))
Expand All @@ -98,5 +113,52 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
assert(isIncluded(df.queryExecution, "Location"))
}
}
}

/**
* Suite that tests the redaction of BatchScanExec.
*/
class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest {

override protected def sparkConf: SparkConf = super.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "")

override protected def getRootPath(df: DataFrame): Path =
df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get
.asInstanceOf[BatchScanExec].scan.asInstanceOf[OrcScan].fileIndex.rootPaths.head

test("explain is redacted using SQLConf") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
val df = spark.read.orc(basePath)
val replacement = "*********"

// Respect SparkConf and replace file:/
assert(isIncluded(df.queryExecution, replacement))
assert(isIncluded(df.queryExecution, "BatchScan"))
assert(!isIncluded(df.queryExecution, "file:/"))

withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)BatchScan") {
// Respect SQLConf and replace FileScan
assert(isIncluded(df.queryExecution, replacement))

assert(!isIncluded(df.queryExecution, "BatchScan"))
assert(isIncluded(df.queryExecution, "file:/"))
}
}
}

test("FileScan description") {
withTempPath { path =>
val dir = path.getCanonicalPath
spark.range(0, 10).write.orc(dir)
val df = spark.read.orc(dir)

assert(isIncluded(df.queryExecution, "ReadSchema"))
assert(isIncluded(df.queryExecution, "BatchScan"))
assert(isIncluded(df.queryExecution, "PushedFilters"))
assert(isIncluded(df.queryExecution, "Location"))
}
}
}

0 comments on commit c1007c2

Please sign in to comment.