Skip to content

Commit

Permalink
SPARK-6338 [CORE] Use standard temp dir mechanisms in tests to avoid…
Browse files Browse the repository at this point in the history
… orphaned temp files

Use `Utils.createTempDir()` to replace other temp file mechanisms used in some tests, to further ensure they are cleaned up, and simplify

Author: Sean Owen <[email protected]>

Closes #5029 from srowen/SPARK-6338 and squashes the following commits:

27b740a [Sean Owen] Fix hive-thriftserver tests that don't expect an existing dir
4a212fa [Sean Owen] Standardize a bit more temp dir management
9004081 [Sean Owen] Revert some added recursive-delete calls
57609e4 [Sean Owen] Use Utils.createTempDir() to replace other temp file mechanisms used in some tests, to further ensure they are cleaned up, and simplify
  • Loading branch information
srowen committed Mar 20, 2015
1 parent d08e3eb commit 6f80c3e
Show file tree
Hide file tree
Showing 33 changed files with 116 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
import org.apache.spark.util.Utils

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
Expand Down Expand Up @@ -405,8 +406,7 @@ private object SparkDocker {

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "")
outFile.deleteOnExit()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
if (line.startsWith("CONTAINER_IP=")) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}

dir
dir.getCanonicalFile
}

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {

override def beforeEach() {
super.beforeEach()
checkpointDir = File.createTempFile("temp", "")
checkpointDir.deleteOnExit()
checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
checkpointDir.delete()
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File

import org.scalatest.FunSuite

import org.apache.spark.util.Utils

class SecurityManagerSuite extends FunSuite {

test("set security with conf") {
Expand Down Expand Up @@ -160,8 +162,7 @@ class SecurityManagerSuite extends FunSuite {
}

test("ssl off setup") {
val file = File.createTempFile("SSLOptionsSuite", "conf")
file.deleteOnExit()
val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())

System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
val conf = new SparkConf()
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file1 = File.createTempFile("someprefix1", "somesuffix1")
val dir = Utils.createTempDir()

val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath

val pluto = Utils.createTempDir()
val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath

Expand Down Expand Up @@ -129,7 +130,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles

val tmpDir = Utils.createTempDir()

// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "")
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
Expand All @@ -420,7 +422,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps("spark.files") should be(Utils.resolveURIs(files))

// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "")
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
Expand All @@ -437,7 +439,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))

// Test python files
val f3 = File.createTempFile("test-submit-python-files", "")
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
import org.apache.spark._
import org.scalatest.FunSuite

import scala.collection.Map
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

import org.apache.spark._
import org.apache.spark.util.Utils

class PipedRDDSuite extends FunSuite with SharedSparkContext {

test("basic pipe") {
Expand Down Expand Up @@ -141,7 +143,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
new File("tasks").delete()
Utils.deleteRecursively(new File("tasks"))
} else {
assert(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
package org.apache.spark.storage

import org.scalatest.FunSuite
import java.io.File

import org.scalatest.FunSuite

import org.apache.spark.SparkConf
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

class BlockObjectWriterSuite extends FunSuite {
test("verify write metrics") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand All @@ -47,8 +49,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("verify write metrics on revert") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand All @@ -71,8 +72,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("Reopening a closed block writer") {
val file = new File("somefile")
file.deleteOnExit()
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolic

class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {

val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
val testFile = new File(Utils.createTempDir(), "FileAppenderSuite-test").getAbsoluteFile

before {
cleanup()
Expand Down
7 changes: 3 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {

test("reading offset bytes of a file") {
val tmpDir2 = Utils.createTempDir()
tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
Expand Down Expand Up @@ -151,7 +150,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {

test("reading offset bytes across multiple files") {
val tmpDir = Utils.createTempDir()
tmpDir.deleteOnExit()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
Files.write("0123456789", files(0), UTF_8)
Files.write("abcdefghij", files(1), UTF_8)
Expand Down Expand Up @@ -357,7 +355,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}

test("loading properties from file") {
val outFile = File.createTempFile("test-load-spark-properties", "test")
val tmpDir = Utils.createTempDir()
val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
Expand All @@ -370,7 +369,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
outFile.delete()
Utils.deleteRecursively(tmpDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.google.common.io.Files
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {

Expand All @@ -60,18 +59,15 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
)

ssc = new StreamingContext(sparkConf, Milliseconds(500))
tempDirectory = Files.createTempDir()
tempDirectory = Utils.createTempDir()
ssc.checkpoint(tempDirectory.getAbsolutePath)
}

after {
if (ssc != null) {
ssc.stop()
}
if (tempDirectory != null && tempDirectory.exists()) {
FileUtils.deleteDirectory(tempDirectory)
tempDirectory = null
}
Utils.deleteRecursively(tempDirectory)
tearDownKafka()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.graphx

import org.scalatest.FunSuite

import com.google.common.io.Files

import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class GraphSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -369,8 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}

test("checkpoint") {
val checkpointDir = Files.createTempDir()
checkpointDir.deleteOnExit()
val checkpointDir = Utils.createTempDir()
withSpark { sc =>
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import java.io._
import java.net.URLClassLoader

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.tools.nsc.interpreter.SparkILoop

import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -196,8 +194,7 @@ class ReplSuite extends FunSuite {
}

test("interacting with files") {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val tempDir = Utils.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
out.write("What's up?\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.rules
import org.apache.spark.sql.catalyst.util
import org.apache.spark.util.Utils

/**
* A collection of generators that build custom bytecode at runtime for performing the evaluation
Expand Down Expand Up @@ -52,7 +52,7 @@ package object codegen {
@DeveloperApi
object DumpByteCode {
import scala.sys.process._
val dumpDirectory = util.getTempFilePath("sparkSqlByteCode")
val dumpDirectory = Utils.createTempDir()
dumpDirectory.mkdir()

def apply(obj: Any): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,9 @@ package org.apache.spark.sql.catalyst

import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}

import org.apache.spark.util.{Utils => SparkUtils}
import org.apache.spark.util.Utils

package object util {
/**
* Returns a path to a temporary file that probably does not exist.
* Note, there is always the race condition that someone created this
* file since the last time we checked. Thus, this shouldn't be used
* for anything security conscious.
*/
def getTempFilePath(prefix: String, suffix: String = ""): File = {
val tempFile = File.createTempFile(prefix, suffix)
tempFile.delete()
tempFile
}

def fileToString(file: File, encoding: String = "UTF-8") = {
val inStream = new FileInputStream(file)
Expand All @@ -56,7 +45,7 @@ package object util {
def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = SparkUtils.getSparkClassLoader) = {
classLoader: ClassLoader = Utils.getSparkClassLoader) = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -67,8 +66,9 @@ private[sql] trait ParquetTest {
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val file = util.getTempFilePath("parquetTest").getCanonicalFile
try f(file) finally if (file.exists()) Utils.deleteRecursively(file)
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
Expand Down
Loading

0 comments on commit 6f80c3e

Please sign in to comment.