Skip to content

Commit

Permalink
SPARK-1677: allow user to disable output dir existence checking
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/SPARK-1677

For compatibility with older versions of Spark it would be nice to have an option `spark.hadoop.validateOutputSpecs` (default true)  for the user to disable the output directory existence checking

Author: CodingCat <[email protected]>

Closes apache#947 from CodingCat/SPARK-1677 and squashes the following commits:

7930f83 [CodingCat] miao
c0c0e03 [CodingCat] bug fix and doc update
5318562 [CodingCat] bug fix
13219b5 [CodingCat] allow user to disable output dir existence checking
  • Loading branch information
CodingCat authored and pwendell committed Jun 5, 2014
1 parent 7c16029 commit 89cdbb0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (old Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
}

test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand All @@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (new Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
this duration will be cleared as well.
</td>
</tr>
<tr>
<td>spark.hadoop.validateOutputSpecs</td>
<td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
</table>

#### Networking
Expand Down

0 comments on commit 89cdbb0

Please sign in to comment.