Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Geode.
Create a StreamingContext
with a SparkConf
configuration
val ssc = new StreamingContext(sparkConf, Seconds(1))
Create a DStream that will connect to net cat server host:port
val lines = ssc.socketTextStream(host, port)
Count each word in each batch
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
Use updateStateByKey
to maintain a running count of each word seen in a text
data stream. Here, the running count is the state and it is an integer. We
define the update function as
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
This is applied on a DStream containing words (say, the pairs DStream containing
(word, 3)
pairs in the earlier example
val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _)
Print a few of the counts to the console. Start the computation.
runningCounts.print()
ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate
Now let's save the running word count to Geode region str_int_region
, which
simply replace print() with saveToGemfire():
import io.pivotal.gemfire.spark.connector.streaming._
runningCounts.saveToGemfire("str_int_region")
You can use the version of saveToGemfire that has the parameter GemFireConnectionConf
:
runningCounts.saveToGemfire("str_int_region", connConf)
See [Spark Streaming Programming Guide] (http://spark.apache.org/docs/latest/streaming-programming-guide.html) for more details about Sarpk streaming programming.
Next: Geode OQL