Skip to content

Commit

Permalink
[SPARK-15103][SQL] Refactored FileCatalog class to allow StreamFileCa…
Browse files Browse the repository at this point in the history
…talog to infer partitioning

## What changes were proposed in this pull request?

File Stream Sink writes the list of written files in a metadata log. StreamFileCatalog reads the list of the files for processing. However StreamFileCatalog does not infer partitioning like HDFSFileCatalog.

This PR enables that by refactoring HDFSFileCatalog to create an abstract class PartitioningAwareFileCatalog, that has all the functionality to infer partitions from a list of leaf files.
- HDFSFileCatalog has been renamed to ListingFileCatalog and it extends PartitioningAwareFileCatalog by providing a list of leaf files from recursive directory scanning.
- StreamFileCatalog has been renamed to MetadataLogFileCatalog and it extends PartitioningAwareFileCatalog by providing a list of leaf files from the metadata log.
- The above two classes has been moved into their own files as they are not interfaces that should be in fileSourceInterfaces.scala.

## How was this patch tested?
- FileStreamSinkSuite was update to see if partitioning gets inferred, and on reading whether the partitions get pruned correctly based on the query.
- Other unit tests are unchanged and pass as expected.

Author: Tathagata Das <[email protected]>

Closes apache#12879 from tdas/SPARK-15103.
  • Loading branch information
tdas committed May 4, 2016
1 parent 6274a52 commit 0fd3a47
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
val fileCatalog: FileCatalog = new HDFSFileCatalog(sparkSession, options, globbedPaths, None)
val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
Expand Down Expand Up @@ -258,7 +258,7 @@ case class DataSource(
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new StreamFileCatalog(sparkSession, basePath)
val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand Down Expand Up @@ -310,8 +310,8 @@ case class DataSource(
})
}

val fileCatalog: FileCatalog =
new HDFSFileCatalog(sparkSession, options, globbedPaths, partitionSchema)
val fileCatalog =
new ListingFileCatalog(sparkSession, globbedPaths, options, partitionSchema)

val dataSchema = userSpecifiedSchema.map { schema =>
val equality =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType


/**
* A [[FileCatalog]] that generates the list of files to process by recursively listing all the
* files present in `paths`.
*
* @param parameters as set of options to control discovery
* @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
class ListingFileCatalog(
sparkSession: SparkSession,
override val paths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType])
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {

@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
@volatile private var cachedPartitionSpec: PartitionSpec = _

refresh()

override def partitionSpec(): PartitionSpec = {
if (cachedPartitionSpec == null) {
cachedPartitionSpec = inferPartitioning()
}
cachedPartitionSpec
}

override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
cachedLeafFiles
}

override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
cachedLeafDirToChildrenFiles
}

override def refresh(): Unit = {
val files = listLeafFiles(paths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}

protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
} else {
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)

val statuses = {
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}

statuses.map {
case f: LocatedFileStatus => f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
// exceeds threshold.
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName
HadoopFsRelation.shouldFilterOut(name)
}

val (dirs, files) = statuses.partition(_.isDirectory)

// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
mutable.LinkedHashSet(files: _*)
} else {
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
}
}
}

override def equals(other: Any): Boolean = other match {
case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
case _ => false
}

override def hashCode(): Int = paths.toSet.hashCode()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}


/**
* An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
partitionSchema: Option[StructType])
extends FileCatalog with Logging {

protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)

protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]

protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]

override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
if (partitionSpec().partitionColumns.isEmpty) {
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
Partition(
values,
leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
}
}
}

override def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq

protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.keys.toSeq
partitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = false,
basePaths = basePaths)

// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
Cast(
Literal.create(row.getUTF8String(i), StringType),
userProvidedSchema.fields(i).dataType).eval()
}: _*)
}

PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
part.copy(values = castPartitionValuesToUserSchema(part.values))
})
case _ =>
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
basePaths = basePaths)
}
}

private def prunePartitions(
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}

if (partitionPruningPredicates.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)

val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})

val selected = partitions.filter {
case PartitionDirectory(values, _) => boundPredicate(values)
}
logInfo {
val total = partitions.length
val selectedSize = selected.length
val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
}

selected
} else {
partitions
}
}

/**
* Contains a set of paths that are considered as the base dirs of the input datasets.
* The partitioning discovery logic will make sure it will stop when it reaches any
* base path. By default, the paths of the dataset provided by users will be base paths.
* For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
* will be `/path/something=true/`, and the returned DataFrame will not contain a column of
* `something`. If users want to override the basePath. They can set `basePath` in the options
* to pass the new base path to the data source.
* For the above example, if the user-provided base path is `/path/`, the returned
* DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
userDefinedBasePath.getOrElse {
// If the user does not provide basePath, we will just use paths.
paths.toSet
}.map { hdfsPath =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
}
Loading

0 comments on commit 0fd3a47

Please sign in to comment.