Skip to content

Commit

Permalink
SPARK-4687. Add a recursive option to the addFile API
Browse files Browse the repository at this point in the history
This adds a recursive option to the addFile API to satisfy Hive's needs.  It only allows specifying HDFS dirs that will be copied down on every executor.

There are a couple outstanding questions.
* Should we allow specifying local dirs as well?  The best way to do this would probably be to archive them.  The drawback is that it would require a fair bit of code that I don't know of any current use cases for.
* The addFiles implementation has a caching component that I don't entirely understand.  What events are we caching between?  AFAICT it's users calling addFile on the same file in the same app at different times?  Do we want/need to add something similar for addDirectory.
*  The addFiles implementation will check to see if an added file already exists and has the same contents.  I imagine we want the same behavior, so planning to add this unless people think otherwise.

I plan to add some tests if people are OK with the approach.

Author: Sandy Ryza <[email protected]>

Closes apache#3670 from sryza/sandy-spark-4687 and squashes the following commits:

f9fc77f [Sandy Ryza] Josh's comments
70cd24d [Sandy Ryza] Add another test
13da824 [Sandy Ryza] Revert executor changes
38bf94d [Sandy Ryza] Marcelo's comments
ca83849 [Sandy Ryza] Add addFile test
1941be3 [Sandy Ryza] Fix test and avoid HTTP server in local mode
31f15a9 [Sandy Ryza] Use cache recursively and fix some compile errors
0239c3d [Sandy Ryza] Change addDirectory to addFile with recursive
46fe70a [Sandy Ryza] SPARK-4687. Add a addDirectory API
  • Loading branch information
sryza authored and JoshRosen committed Feb 5, 2015
1 parent 6580929 commit c4b1108
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 27 deletions.
68 changes: 56 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,37 @@ import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID

import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}

import akka.actor.Props

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.mesos.MesosNativeLibrary
import akka.actor.Props

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
Expand Down Expand Up @@ -1016,12 +1024,48 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String) {
def addFile(path: String): Unit = {
addFile(path, false)
}

/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*
* A directory can be given if the recursive option is set to true. Currently directories are only
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case _ => path
}

val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.isDirectory(hadoopPath)
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
}
if (!recursive && isDir) {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
}

val key = if (!isLocal && scheme == "file") {
env.httpFileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp
Expand Down Expand Up @@ -1633,8 +1677,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
Expand Down
88 changes: 73 additions & 15 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,10 @@ private[spark] object Utils extends Logging {
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
Expand Down Expand Up @@ -456,17 +458,18 @@ private[spark] object Utils extends Logging {
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param in InputStream to download.
* @param tempFile File path to download `in` to.
* @param destFile File path to move `tempFile` to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
*/
private def downloadFile(
url: String,
in: InputStream,
tempFile: File,
destFile: File,
fileOverwrite: Boolean): Unit = {
val tempFile = File.createTempFile("fetchFileTemp", null,
new File(destFile.getParentFile.getAbsolutePath))
logInfo(s"Fetching $url to $tempFile")

try {
val out = new FileOutputStream(tempFile)
Expand Down Expand Up @@ -505,7 +508,7 @@ private[spark] object Utils extends Logging {
removeSourceFile: Boolean = false): Unit = {

if (destFile.exists) {
if (!Files.equal(sourceFile, destFile)) {
if (!filesEqualRecursive(sourceFile, destFile)) {
if (fileOverwrite) {
logInfo(
s"File $destFile exists and does not match contents of $url, replacing it with $url"
Expand Down Expand Up @@ -540,13 +543,44 @@ private[spark] object Utils extends Logging {
Files.move(sourceFile, destFile)
} else {
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
Files.copy(sourceFile, destFile)
copyRecursive(sourceFile, destFile)
}
}

private def filesEqualRecursive(file1: File, file2: File): Boolean = {
if (file1.isDirectory && file2.isDirectory) {
val subfiles1 = file1.listFiles()
val subfiles2 = file2.listFiles()
if (subfiles1.size != subfiles2.size) {
return false
}
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall {
case (f1, f2) => filesEqualRecursive(f1, f2)
}
} else if (file1.isFile && file2.isFile) {
Files.equal(file1, file2)
} else {
false
}
}

private def copyRecursive(source: File, dest: File): Unit = {
if (source.isDirectory) {
if (!dest.mkdir()) {
throw new IOException(s"Failed to create directory ${dest.getPath}")
}
val subfiles = source.listFiles()
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
} else {
Files.copy(source, dest)
}
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
Expand All @@ -558,14 +592,11 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
Option(uri.getScheme).getOrElse("file") match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)

var uc: URLConnection = null
if (securityMgr.isAuthenticationEnabled()) {
logDebug("fetchFile with security enabled")
Expand All @@ -583,17 +614,44 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
downloadFile(url, in, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
}
}

/**
* Fetch a file or directory from a Hadoop-compatible filesystem.
*
* Visible for testing
*/
private[spark] def fetchHcfsFile(
path: Path,
targetDir: File,
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
}
}
}

Expand Down
77 changes: 77 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@

package org.apache.spark

import java.io.File

import com.google.common.base.Charsets._
import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable

import org.apache.spark.util.Utils

class SparkContextSuite extends FunSuite with LocalSparkContext {

test("Only one SparkContext may be active at a time") {
Expand Down Expand Up @@ -72,4 +79,74 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file = File.createTempFile("someprefix", "somesuffix")
val absolutePath = file.getAbsolutePath
try {
Files.write("somewords", file, UTF_8)
val length = file.length()
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file.getAbsolutePath)
sc.parallelize(Array(1), 1).map(x => {
val gotten = new File(SparkFiles.get(file.getName))
if (!gotten.exists()) {
throw new SparkException("file doesn't exist")
}
if (length != gotten.length()) {
throw new SparkException(
s"file has different length $length than added file ${gotten.length()}")
}
if (absolutePath == gotten.getAbsolutePath) {
throw new SparkException("file should have been copied")
}
x
}).count()
} finally {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
val saturn = Utils.createTempDir(neptune.getAbsolutePath)
val alien1 = File.createTempFile("alien", "1", neptune)
val alien2 = File.createTempFile("alien", "2", saturn)

try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(neptune.getAbsolutePath, true)
sc.parallelize(Array(1), 1).map(x => {
val sep = File.separator
if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
throw new SparkException("can't access file under root added directory")
}
if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
.exists()) {
throw new SparkException("can't access file in nested directory")
}
if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
.exists()) {
throw new SparkException("file exists that shouldn't")
}
x
}).count()
} finally {
sc.stop()
}
}

test("addFile recursive can't add directories by default") {
val dir = Utils.createTempDir()

try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
intercept[SparkException] {
sc.addFile(dir.getAbsolutePath)
}
} finally {
sc.stop()
}
}
}
31 changes: 31 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf

class UtilsSuite extends FunSuite with ResetSystemProperties {
Expand Down Expand Up @@ -381,4 +384,32 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
require(cnt === 2, "prepare should be called twice")
require(time < 500, "preparation time should not count")
}

test("fetch hcfs dir") {
val tempDir = Utils.createTempDir()
val innerTempDir = Utils.createTempDir(tempDir.getPath)
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
val targetDir = new File("target-dir")
Files.write("some text", tempFile, UTF_8)

try {
val path = new Path("file://" + tempDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.exists())
assert(targetDir.isDirectory())
val newInnerDir = new File(targetDir, innerTempDir.getName)
println("inner temp dir: " + innerTempDir.getName)
targetDir.listFiles().map(_.getName).foreach(println)
assert(newInnerDir.exists())
assert(newInnerDir.isDirectory())
val newInnerFile = new File(newInnerDir, tempFile.getName)
assert(newInnerFile.exists())
assert(newInnerFile.isFile())
} finally {
Utils.deleteRecursively(tempDir)
Utils.deleteRecursively(targetDir)
}
}
}

0 comments on commit c4b1108

Please sign in to comment.