Skip to content

Commit

Permalink
spark-544, introducing SparkConf and related configuration overhaul.
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Dec 24, 2013
1 parent 0bc57c5 commit 2573add
Show file tree
Hide file tree
Showing 96 changed files with 612 additions and 478 deletions.
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
}
}

private[spark] class MapOutputTracker extends Logging {
private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {

private val timeout = AkkaUtils.askTimeout
private val timeout = AkkaUtils.askTimeout(conf)

// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
Expand Down Expand Up @@ -192,7 +192,8 @@ private[spark] class MapOutputTracker extends Logging {
}
}

private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {

// Cache a serialized version of the output statuses for each shuffle to send them out faster
private var cacheEpoch = epoch
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ abstract class Partitioner extends Serializable {
}

object Partitioner {

import SparkContext.{globalConf => conf}
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
Expand All @@ -52,7 +54,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
if (System.getProperty("spark.default.parallelism") != null) {
if (conf.getOrElse("spark.default.parallelism", null) != null) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.apache.spark

import scala.collection.JavaConversions._
import scala.collection.concurrent.TrieMap

import com.typesafe.config.ConfigFactory

private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
@transient lazy val config = ConfigFactory.systemProperties()
.withFallback(ConfigFactory.parseResources("spark.conf"))
// TODO this should actually be synchronized
private val configMap = TrieMap[String, String]()

if (loadClasspathRes && !config.entrySet().isEmpty) {
for (e <- config.entrySet()) {
configMap += ((e.getKey, e.getValue.unwrapped().toString))
}
}

def setMasterUrl(master: String) = {
if (master != null)
configMap += (("spark.master", master))
this
}

def setAppName(name: String) = {
if (name != null)
configMap += (("spark.appName", name))
this
}

def setJars(jars: Seq[String]) = {
if (!jars.isEmpty)
configMap += (("spark.jars", jars.mkString(",")))
this
}

def set(k: String, value: String) = {
configMap += ((k, value))
this
}

def setSparkHome(home: String) = {
if (home != null)
configMap += (("spark.home", home))
this
}

def set(map: Seq[(String, String)]) = {
if (map != null && !map.isEmpty)
configMap ++= map
this
}

def get(k: String): String = {
configMap(k)
}

def getAllConfiguration = configMap.clone.entrySet().iterator

def getOrElse(k: String, defaultValue: String): String = {
configMap.getOrElse(k, defaultValue)
}

override def clone: SparkConf = {
val conf = new SparkConf(false)
conf.set(configMap.toSeq)
conf
}

}
140 changes: 77 additions & 63 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,91 +22,99 @@ import java.net.URI
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
import scala.collection.{Map, immutable}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.BooleanWritable
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.io.FloatWritable
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend,
SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend,
MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils}
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param conf a Spark Config object describing the context configuration. Any settings in this
* config overrides the default configs as well as system properties.
*
* @param environment Environment variables to set on worker nodes.
*/
class SparkContext(
val master: String,
val appName: String,
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val conf: SparkConf,
val environment: Map[String, String] = Map(),
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
scala.collection.immutable.Map())
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
extends Logging {

// Ensure logging is initialized before we spawn any threads
initLogging()
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
*/
def this(master: String, appName: String, sparkHome: String = null,
jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
immutable.Map()) =
this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
.setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
environment, preferredNodeLocationData)

// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localHostName())
}
if (System.getProperty("spark.driver.port") == null) {
System.setProperty("spark.driver.port", "0")
}
Try(conf.get("spark.driver.host"))
.getOrElse(conf.set("spark.driver.host", Utils.localHostName()))

Try(conf.get("spark.driver.port"))
.getOrElse(conf.set("spark.driver.port", "0"))

val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
conf.get("spark.jars").split(",")
} else null

val master = conf.get("spark.master")
val appName = conf.get("spark.appName")

val isLocal = (master == "local" || master.startsWith("local["))

// Ensure logging is initialized before we spawn any threads
initLogging()

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port").toInt,
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
conf,
true,
isLocal)
SparkEnv.set(env)
Expand Down Expand Up @@ -165,24 +173,24 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
val conf = SparkHadoopUtil.get.newConfiguration()
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
Utils.getSystemProperties.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), value)
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
conf
val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

private[spark] var checkpointDir: Option[String] = None
Expand Down Expand Up @@ -695,10 +703,8 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
if (sparkHome != null) {
Some(sparkHome)
} else if (System.getProperty("spark.home") != null) {
Some(System.getProperty("spark.home"))
if (conf.getOrElse("spark.home", null) != null) {
Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
} else {
Expand Down Expand Up @@ -909,6 +915,14 @@ object SparkContext {

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

private lazy val conf = new SparkConf()

private[spark] def globalConf = {
if (SparkEnv.get != null) {
SparkEnv.get.conf
} else conf
}

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
Expand Down Expand Up @@ -1020,7 +1034,7 @@ object SparkContext {
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
Try(globalConf.get("spark.executor.memory")).toOption
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
Expand Down Expand Up @@ -1123,7 +1137,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(sc)
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
val coarseGrained = globalConf.getOrElse("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
Expand Down
Loading

0 comments on commit 2573add

Please sign in to comment.