Skip to content

Commit

Permalink
Revert "[SPARK-6752][Streaming] Allow StreamingContext to be recreate…
Browse files Browse the repository at this point in the history
…d from checkpoint and existing SparkContext"

This reverts commit 534f2a4.
  • Loading branch information
pwendell committed Apr 25, 2015
1 parent cca9905 commit a61d65f
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 503 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ object Checkpoint extends Logging {
}

/** Get checkpoint files present in the give directory, ordered by oldest-first */
def getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path] = {

def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
(time1 < time2) || (time1 == time2 && bk1)
}

val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
if (fs.exists(path)) {
val statuses = fs.listStatus(path)
if (statuses != null) {
Expand Down Expand Up @@ -162,7 +160,7 @@ class CheckpointWriter(
}

// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
if (allCheckpointFiles.size > 10) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
Expand Down Expand Up @@ -236,24 +234,15 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {

/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
* checkpoint files could be read correctly, then return None (if ignoreReadError = true),
* or throw exception (if ignoreReadError = false).
*/
def read(
checkpointDir: String,
conf: SparkConf,
hadoopConf: Configuration,
ignoreReadError: Boolean = false): Option[Checkpoint] = {
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
{
val checkpointPath = new Path(checkpointDir)

// TODO(rxin): Why is this a def?!
def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)

// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
Expand Down Expand Up @@ -293,10 +282,7 @@ object CheckpointReader extends Logging {
})

// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
}
None
throw new SparkException("Failed to read checkpoint from directory " + checkpointPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,6 @@ class StreamingContext private[streaming] (
*/
def this(path: String) = this(path, new Configuration)

/**
* Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
* @param path Path to the directory that was specified as the checkpoint directory
* @param sparkContext Existing SparkContext
*/
def this(path: String, sparkContext: SparkContext) = {
this(
sparkContext,
CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
null)
}


if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
Expand All @@ -128,12 +115,10 @@ class StreamingContext private[streaming] (
private[streaming] val isCheckpointPresent = (cp_ != null)

private[streaming] val sc: SparkContext = {
if (sc_ != null) {
sc_
} else if (isCheckpointPresent) {
if (isCheckpointPresent) {
new SparkContext(cp_.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
sc_
}
}

Expand All @@ -144,7 +129,7 @@ class StreamingContext private[streaming] (

private[streaming] val conf = sc.conf

private[streaming] val env = sc.env
private[streaming] val env = SparkEnv.get

private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
Expand Down Expand Up @@ -189,9 +174,7 @@ class StreamingContext private[streaming] (

/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
assert(env != null)
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
SparkEnv.get.metricsSystem.registerSource(streamingSource)

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
Expand Down Expand Up @@ -638,59 +621,19 @@ object StreamingContext extends Logging {
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
val checkpointOption = try {
CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf)
} catch {
case e: Exception =>
if (createOnError) {
None
} else {
throw e
}
}
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}


/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext
): StreamingContext = {
getOrCreate(checkpointPath, creatingFunc, sparkContext, createOnError = false)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc` on the provided `sparkContext`. Note
* that the SparkConf configuration in the checkpoint data will not be restored as the
* SparkContext has already been created.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext using the given SparkContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: SparkContext => StreamingContext,
sparkContext: SparkContext,
createOnError: Boolean
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, sparkContext.conf, sparkContext.hadoopConfiguration, createOnError)
checkpointOption.map(new StreamingContext(sparkContext, _, null))
.getOrElse(creatingFunc(sparkContext))
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function0 => JFunction0}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
import org.apache.spark.streaming.receiver.Receiver

/**
* A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
Expand Down Expand Up @@ -656,7 +655,6 @@ object JavaStreamingContext {
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
*/
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
checkpointPath: String,
factory: JavaStreamingContextFactory
Expand All @@ -678,7 +676,6 @@ object JavaStreamingContext {
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
*/
@deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
Expand All @@ -703,7 +700,6 @@ object JavaStreamingContext {
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
@deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0")
def getOrCreate(
checkpointPath: String,
hadoopConf: Configuration,
Expand All @@ -716,117 +712,6 @@ object JavaStreamingContext {
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext]
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
})
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext],
hadoopConf: Configuration
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
}, hadoopConf)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible
* file system
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction0[JavaStreamingContext],
hadoopConf: Configuration,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
creatingFunc.call().ssc
}, hadoopConf, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc)
new JavaStreamingContext(ssc)
}

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new JavaStreamingContext
* @param sparkContext SparkContext using which the StreamingContext will be created
* @param createOnError Whether to create a new JavaStreamingContext if there is an
* error in reading checkpoint data.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: JFunction[JavaSparkContext, JavaStreamingContext],
sparkContext: JavaSparkContext,
createOnError: Boolean
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, (sparkContext: SparkContext) => {
creatingFunc.call(new JavaSparkContext(sparkContext)).ssc
}, sparkContext.sc, createOnError)
new JavaStreamingContext(ssc)
}

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
Expand Down
Loading

0 comments on commit a61d65f

Please sign in to comment.