Skip to content

Commit

Permalink
upgrade to spark 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranab Ghosh committed Nov 14, 2019
1 parent e37cd74 commit 1e04758
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 26 deletions.
21 changes: 9 additions & 12 deletions spark/build.sbt
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
import sbtassembly.{Plugin},Plugin.AssemblyKeys._
import sbtassembly.Plugin._

name := "ruscello-spark"

organization := "org.ruscello"

version := "1.0"

scalaVersion := "2.10.4"
scalaVersion := "2.12.0"

isSnapshot := true

libraryDependencies ++=Seq(
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.6.1",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.1",
"org.apache.spark" %% "spark-core" % "3.0.0-preview" % "provided",
"org.apache.spark" %% "spark-streaming" % "3.0.0-preview" % "provided",
//"org.apache.spark" %% "spark-streaming-kafka" % "1.6.1",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.0.0-preview",
"org.apache.commons" % "commons-lang3" % "3.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.4",
"junit" % "junit" % "4.7" % "test",
"org.scalatest" % "scalatest_2.10" % "2.0" % "test",
"com.typesafe" % "config" % "1.2.1",
"mawazo" %% "hoidla" % "1.0",
"org.chombo" %% "chombo-spark" % "1.0",
"mawazo" %% "chombo" % "1.0",
"gov.nist.math" % "jama" % "1.0.3"
)

net.virtualvoid.sbt.graph.Plugin.graphSettings

assemblySettings
2 changes: 1 addition & 1 deletion spark/project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.0
sbt.version=1.3.3
8 changes: 4 additions & 4 deletions spark/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
logLevel := Level.Warn

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")


resolvers ++= Seq(
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Akka Repository" at "http://repo.akka.io/releases/",
"Akka Repository" at "https://repo.akka.io/releases/",
"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
"Github Maven Repository" at "https://github.com/pranab/hoidla/tree/master/target"
)
Expand Down
8 changes: 4 additions & 4 deletions spark/src/main/scala/org/ruscello/event/EventCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object EventCluster extends JobConfiguration {
})

if (false) {
tsStrm.foreach(rdd => {
tsStrm.foreachRDD(rdd => {
val count = rdd.count
println("*** num of records: " + count)
rdd.foreach(r => {
Expand All @@ -125,7 +125,7 @@ object EventCluster extends JobConfiguration {

if (false) {
println("*** state stream")
mappedStatefulStream.foreach(rdd => {
mappedStatefulStream.foreachRDD(rdd => {
rdd.foreach(r => {
println("*** key: " + r._1 + " timestamp: " + r._2 + "alarm: " + r._3)
})
Expand All @@ -139,7 +139,7 @@ object EventCluster extends JobConfiguration {

if (false) {
println("*** alarms" )
alarmStream.foreach(rdd => {
alarmStream.foreachRDD(rdd => {
rdd.foreach(r => {
println(r)
})
Expand All @@ -155,7 +155,7 @@ object EventCluster extends JobConfiguration {

// Wait and then exit. To run forever call without a timeout
if (duration > 0) {
strContxt.awaitTermination(duration * 1000)
strContxt.awaitTermination()
} else {
strContxt.awaitTermination()
}
Expand Down
18 changes: 13 additions & 5 deletions spark/src/main/scala/org/ruscello/similarity/LevelSimilarity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ import org.hoidla.window.SizeBoundWindow
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
//import org.apache.spark.streaming.kafka.KafkaUtils
//import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


case class WindowConfig(windowSize : Int, windowStep : Int, levelThrehold : Int,
levelThresholdMargin : Int, levelCrossingCountThreshold : Double, checkingStrategy : String,
Expand Down Expand Up @@ -123,7 +129,7 @@ object LevelSimilarity {
val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER_2)
val stateStream = getStateStream(lines, idOrdinal, updateFunc)

stateStream.foreach(ssrdd => {
stateStream.foreachRDD(ssrdd => {
ssrdd.foreach(ss => {
val res = ss._2
println("device:" + ss._1 + " num violations:" + res.numViolations)
Expand All @@ -132,6 +138,7 @@ object LevelSimilarity {

}

/*
case "kafka" => {
//kafka as stream source
val zooKeeperServerLList = config.getString("zookeeper.connect")
Expand Down Expand Up @@ -179,6 +186,7 @@ object LevelSimilarity {
}
}
}
*/

case _ => {
throw new IllegalArgumentException("unsupported input stream source")
Expand Down Expand Up @@ -217,7 +225,7 @@ object LevelSimilarity {
}

private def printInput(lines : DStream[String]) {
lines.foreach(lr => {
lines.foreachRDD(lr => {
lr.foreach(l => {
println(l)
}
Expand All @@ -226,7 +234,7 @@ object LevelSimilarity {
}

private def printKeyedInput(keyedLines : DStream[(String, String)]) {
keyedLines.foreach(kls => {
keyedLines.foreachRDD(kls => {
kls.foreach(kl => {
println(kl._1)
})
Expand Down

0 comments on commit 1e04758

Please sign in to comment.