Skip to content

Commit

Permalink
[SPARK-9472] [STREAMING] consistent hadoop configuration, streaming only
Browse files Browse the repository at this point in the history
Author: cody koeninger <[email protected]>

Closes #7772 from koeninger/streaming-hadoop-config and squashes the following commits:

5267284 [cody koeninger] [SPARK-4229][Streaming] consistent hadoop configuration, streaming only
  • Loading branch information
koeninger authored and tdas committed Jul 31, 2015
1 parent 3c66ff7 commit 9307f56
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
Expand Down Expand Up @@ -100,7 +101,7 @@ object Checkpoint extends Logging {
}

val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.serializer.SerializationDebugger
Expand Down Expand Up @@ -110,7 +111,7 @@ class StreamingContext private[streaming] (
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(path, new Configuration)
def this(path: String) = this(path, SparkHadoopUtil.get.conf)

/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
Expand Down Expand Up @@ -803,7 +804,7 @@ object StreamingContext extends Logging {
def getActiveOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
ACTIVATION_LOCK.synchronized {
Expand All @@ -828,7 +829,7 @@ object StreamingContext extends Logging {
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F],
conf: Configuration = new Configuration) {
conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
Expand Down Expand Up @@ -136,7 +137,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(new StreamingContext(path, new Configuration))
def this(path: String) = this(new StreamingContext(path, SparkHadoopUtil.get.conf))

/**
* Re-creates a JavaStreamingContext from a checkpoint file.
Expand Down

0 comments on commit 9307f56

Please sign in to comment.