Skip to content

Commit

Permalink
Merge branch 'mesos'
Browse files Browse the repository at this point in the history
  • Loading branch information
haitaoyao committed Jan 30, 2013
2 parents 4670c99 + ccb67ff commit dd27f8e
Show file tree
Hide file tree
Showing 56 changed files with 442 additions and 408 deletions.
2 changes: 1 addition & 1 deletion bagel/src/test/scala/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
System.clearProperty("spark.driver.port")
}

test("halting by voting") {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import spark.storage.{BlockManager, StorageLevel}
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[String]

/** Gets or computes an RDD split. Used by RDD.iterator() when a RDD is cached. */
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
blockManager.get(key) match {
Expand Down Expand Up @@ -50,7 +50,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// If we got here, we have to load the split
val elements = new ArrayBuffer[Any]
logInfo("Computing partition " + split)
elements ++= rdd.compute(split, context)
elements ++= rdd.computeOrReadCheckpoint(split, context)
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Ac
}
}

private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logging {
val ip: String = System.getProperty("spark.master.host", "localhost")
val port: Int = System.getProperty("spark.master.port", "7077").toInt
val actorName: String = "MapOutputTracker"
private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolean) extends Logging {

val timeout = 10.seconds

Expand All @@ -56,11 +53,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var cacheGeneration = generation
val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]

var trackerActor: ActorRef = if (isMaster) {
val actorName: String = "MapOutputTracker"
var trackerActor: ActorRef = if (isDriver) {
val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
val ip = System.getProperty("spark.driver.host", "localhost")
val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
actorSystem.actorFor(url)
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,7 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
}

private[spark]
class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U)
extends RDD[(K, U)](prev) {

class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
override def getSplits = firstParent[(K, V)].splits
override val partitioner = firstParent[(K, V)].partitioner
override def compute(split: Split, context: TaskContext) =
Expand Down
146 changes: 89 additions & 57 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
package spark

import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream}
import java.net.URL
import java.util.{Date, Random}
import java.util.{HashMap => JHashMap}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.TextOutputFormat

import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
Expand All @@ -30,7 +20,6 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
import spark.rdd.BlockRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
Expand Down Expand Up @@ -73,51 +62,56 @@ import SparkContext._
* on RDD internals.
*/
abstract class RDD[T: ClassManifest](
@transient var sc: SparkContext,
var dependencies_ : List[Dependency[_]]
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {


/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))

// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================

/** Function for computing a given partition. */
/** Implemented by subclasses to compute a given partition. */
def compute(split: Split, context: TaskContext): Iterator[T]

/** Set of partitions in this RDD. */
protected def getSplits(): Array[Split]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getSplits: Array[Split]

/** How this RDD depends on any parent RDDs. */
protected def getDependencies(): List[Dependency[_]] = dependencies_
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps

/** A friendly name for this RDD */
var name: String = null

/** Optionally overridden by subclasses to specify placement preferences. */
protected def getPreferredLocations(split: Split): Seq[String] = Nil

/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None


// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================

/** A unique ID for this RDD (within its SparkContext). */
val id = sc.newRddId()

/** A friendly name for this RDD */
var name: String = null

/** Assign a name to this RDD */
def setName(_name: String) = {
name = _name
this
}

/**
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
Expand All @@ -142,15 +136,24 @@ abstract class RDD[T: ClassManifest](
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel

// Our dependencies and splits will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
private var dependencies_ : Seq[Dependency[_]] = null
@transient private var splits_ : Array[Split] = null

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)

/**
* Get the preferred location of a split, taking into account whether the
* Get the list of dependencies of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def preferredLocations(split: Split): Seq[String] = {
if (isCheckpointed) {
checkpointData.get.getPreferredLocations(split)
} else {
getPreferredLocations(split)
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}

Expand All @@ -159,22 +162,21 @@ abstract class RDD[T: ClassManifest](
* RDD is checkpointed or not.
*/
final def splits: Array[Split] = {
if (isCheckpointed) {
checkpointData.get.getSplits
} else {
getSplits
checkpointRDD.map(_.splits).getOrElse {
if (splits_ == null) {
splits_ = getSplits
}
splits_
}
}

/**
* Get the list of dependencies of this RDD, taking into account whether the
* Get the preferred location of a split, taking into account whether the
* RDD is checkpointed or not.
*/
final def dependencies: List[Dependency[_]] = {
if (isCheckpointed) {
dependencies_
} else {
getDependencies
final def preferredLocations(split: Split): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}

Expand All @@ -184,10 +186,19 @@ abstract class RDD[T: ClassManifest](
* subclasses of RDD.
*/
final def iterator(split: Split, context: TaskContext): Iterator[T] = {
if (isCheckpointed) {
checkpointData.get.iterator(split, context)
} else if (storageLevel != StorageLevel.NONE) {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Split, context: TaskContext): Iterator[T] = {
if (isCheckpointed) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
Expand Down Expand Up @@ -578,15 +589,15 @@ abstract class RDD[T: ClassManifest](
/**
* Return whether this RDD has been checkpointed or not
*/
def isCheckpointed(): Boolean = {
if (checkpointData.isDefined) checkpointData.get.isCheckpointed() else false
def isCheckpointed: Boolean = {
checkpointData.map(_.isCheckpointed).getOrElse(false)
}

/**
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Option[String] = {
if (checkpointData.isDefined) checkpointData.get.getCheckpointFile() else None
def getCheckpointFile: Option[String] = {
checkpointData.flatMap(_.getCheckpointFile)
}

// =======================================================================
Expand All @@ -611,31 +622,52 @@ abstract class RDD[T: ClassManifest](
def context = sc

/**
* Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler
* Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
* after a job using this RDD has completed (therefore the RDD has been materialized and
* potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
*/
protected[spark] def doCheckpoint() {
if (checkpointData.isDefined) checkpointData.get.doCheckpoint()
dependencies.foreach(_.rdd.doCheckpoint())
private[spark] def doCheckpoint() {
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}

/**
* Changes the dependencies of this RDD from its original parents to the new RDD
* (`newRDD`) created from the checkpoint file.
* Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
* created from the checkpoint file, and forget its old dependencies and splits.
*/
protected[spark] def changeDependencies(newRDD: RDD[_]) {
private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
dependencies_ = List(new OneToOneDependency(newRDD))
dependencies_ = null
splits_ = null
deps = null // Forget the constructor argument for dependencies too
}

/**
* Clears the dependencies of this RDD. This method must ensure that all references
* to the original parent RDDs is removed to enable the parent RDDs to be garbage
* collected. Subclasses of RDD may override this method for implementing their own cleaning
* logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
* logic. See [[spark.rdd.UnionRDD]] for an example.
*/
protected[spark] def clearDependencies() {
protected def clearDependencies() {
dependencies_ = null
}

/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString(): String = {
def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
Seq(prefix + rdd + " (" + rdd.splits.size + " splits)") ++
rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
}
debugString(this).mkString("\n")
}

override def toString(): String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""),
getClass.getSimpleName,
id,
origin)

}
Loading

0 comments on commit dd27f8e

Please sign in to comment.