Skip to content

Commit

Permalink
Update RecoverableNetworkWordCount.scala
Browse files Browse the repository at this point in the history
Trying this example, I missed the moment when the checkpoint was iniciated

Author: comcmipi <[email protected]>

Closes apache#2735 from comcmipi/patch-1 and squashes the following commits:

b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala
96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
  • Loading branch information
comcmipi authored and tdas committed Nov 10, 2014
1 parent 3a02d41 commit 0340c56
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.util.IntParam
*/
object RecoverableNetworkWordCount {

def createContext(ip: String, port: Int, outputPath: String) = {
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
Expand All @@ -66,6 +66,7 @@ object RecoverableNetworkWordCount {
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkpointDirectory)

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
Expand Down Expand Up @@ -101,7 +102,7 @@ object RecoverableNetworkWordCount {
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath)
createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
Expand Down

0 comments on commit 0340c56

Please sign in to comment.