Skip to content

Commit

Permalink
feat: migrate to fabric8
Browse files Browse the repository at this point in the history
  • Loading branch information
nikkatalnikov committed May 13, 2019
1 parent f89cff6 commit 143a0f1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 132 deletions.
8 changes: 1 addition & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",

"io.reactivex" %% "rxscala" % "0.26.5",

"com.facebook.presto" % "presto-jdbc" % "0.151" % Test,
"com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2",
"com.microsoft.sqlserver" % "mssql-jdbc" % "7.2.0.jre8",
Expand All @@ -28,18 +26,14 @@ libraryDependencies ++= Seq(

"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"com.typesafe" % "config" % "1.0.2",
"io.kubernetes" % "client-java" % "4.0.0",
"org.json" % "json" % "20180813",

"org.scalatest" %% "scalatest" % "3.2.0-SNAP10" % Test,
"org.scalacheck" %% "scalacheck" % "1.14.0" % Test,

"org.apache.logging.log4j" % "log4j-core" % "2.7",
"org.apache.logging.log4j" % "log4j-api" % "2.7",

"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"io.fabric8" % "kubernetes-client" % "4.2.2" % Test,

"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion exclude ("org.apache.kafka","kafka"),
"org.apache.kafka" %% "kafka" % "2.1.0",
Expand Down
2 changes: 1 addition & 1 deletion deployment/spark-prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
image: "gcr.io/spark-operator/spark:v2.4.0-gcs-prometheus"
imagePullPolicy: Always
mainClass: consumer.KafkaConsumer
mainApplicationFile: "http://192.168.0.104:8080/bigkube-assembly-0.1.jar" # check IP
mainApplicationFile: "http://172.29.224.36:8080/bigkube-assembly-0.1.jar" # check IP
sparkVersion: "2.4.0"
restartPolicy:
type: Never
Expand Down
2 changes: 1 addition & 1 deletion src/it/resources/it.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
minikube_url = 192.168.64.11
minikube_url = 192.168.64.24

prestodb = {
db {
Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/it_tests/KafkaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class KafkaTest extends FunSuite with BeforeAndAfterAll with Matchers {
producer.close()

// let Spark Streaming fetch latest from Kafka msg and store it
Thread.sleep(60000)
Thread.sleep(90000)
}
//
// test("it reads data from Kafka topics and stores it to hdfs") {
Expand All @@ -64,7 +64,7 @@ class KafkaTest extends FunSuite with BeforeAndAfterAll with Matchers {
// count should be > 0
// }

test("it reads data from Kafka topics and stores it to mssql") {
test("it reads data from Kafka topics and stores it to mssql") {
val usr = MSSQLServiceInstance.getRandomUser

println(s"usr: $usr")
Expand Down
133 changes: 23 additions & 110 deletions src/it/scala/it_tests/utils/SparkController.scala
Original file line number Diff line number Diff line change
@@ -1,128 +1,41 @@
package it_tests.utils

import org.json.JSONObject
import org.yaml.snakeyaml.Yaml
import java.io.{File, FileInputStream}
import java.util

import com.google.gson.internal.LinkedTreeMap
import io.kubernetes.client.apis.CustomObjectsApi
import io.kubernetes.client.util.Config
import io.kubernetes.client.Configuration
import io.kubernetes.client.models._
import rx.lang.scala.Observable
import io.circe.generic.auto._
import io.circe.parser._

import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration._
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient}

class SparkController(crdNamespace: String, resourceName: String) {
import scala.concurrent.ExecutionContext.Implicits.global

val client: KubernetesClient = new DefaultKubernetesClient()
private val crdInstanceName = "spark-pi"
private val crdGroup = "sparkoperator.k8s.io"
private val crdVersion = "v1beta1"
private val crdPlural = "sparkapplications"
private val crdInstanceName = "spark-pi"

private val apiClient = Config.defaultClient
// apiClient.setDebugging(true)
Configuration.setDefaultApiClient(apiClient)
private val apiInstance = new CustomObjectsApi
private val customObjectBody = convertYamlToJson(resourceName)

private val crdStateStream = Observable
.interval(Duration(1000, MILLISECONDS))
.map(_ => {
val crdResponse = apiInstance.getNamespacedCustomObject(
crdGroup,
crdVersion,
crdNamespace,
crdPlural,
crdInstanceName)

val JSONResp = new JSONObject(crdResponse.asInstanceOf[LinkedTreeMap[String, Object]])
val decoded = decode[CustomObject](JSONResp.toString)

decoded match {
case Right(x) => x
case Left(ex) => throw new RuntimeException(ex)
}
})
.distinctUntilChanged(x => x.status)
.share
private val crdScope = "Namespaced"
private val crdPodName = "spark-pi-driver"

private val resource = new FileInputStream(new File(resourceName))
val crdContext: CustomResourceDefinitionContext = new CustomResourceDefinitionContext.Builder()
.withGroup(crdGroup)
.withName(crdInstanceName)
.withScope(crdScope)
.withPlural(crdPlural)
.withVersion(crdVersion)
.build()

def launchSparkTestDeployment(): Unit = {
val apiCall =
Future(apiInstance
.createNamespacedCustomObject(
crdGroup,
crdVersion,
crdNamespace,
crdPlural,
customObjectBody.toMap,
"false"))
val sparkCustomResource = client.customResource(crdContext)

apiCall onComplete {
case Failure(ex) =>
throw new RuntimeException(ex)
case Success(x) =>
println(s"Startup finished successfully: $x")
}
sparkCustomResource.create(crdNamespace, resource)
Thread.sleep(30000)

Await.result(apiCall, Duration.Inf)

crdStateStream
.takeWhile(x => {
x.status match {
case None => true
case Some(status) => status.applicationState.state != SparkOperatorStatus.RunningState
}
})
.toBlocking
.subscribe(x => println(s"Waiting for status ${SparkOperatorStatus.RunningState}: $x"))

crdStateStream
.takeWhile(x => {
x.status match {
case None => false
case Some(status) => status.applicationState.state != SparkOperatorStatus.CompletedState
}
})
.onErrorResumeNext(_ => Observable.empty)
.subscribe(x => println(s"Monitoring CRD status while running: $x"))
client.pods()
.inNamespace(crdNamespace)
.withName(crdPodName)
.watchLog(System.out)
}

def cleanUpSparkTestDeployment(): Unit = {
val apiCall =
Future(apiInstance
.deleteNamespacedCustomObject(
crdGroup,
crdVersion,
crdNamespace,
crdPlural,
crdInstanceName,
new V1DeleteOptions,
0,
false,
"Foreground"))

apiCall onComplete {
case Failure(ex) =>
println(s"Cleanup failed with error: $ex")
case Success(x) =>
println(s"Cleanup finished successfully: $x")
}

Await.result(apiCall, Duration.Inf)
}

private def convertYamlToJson(resourceName: String): JSONObject = {
val input = new FileInputStream(new File(resourceName))
val yaml: Yaml = new Yaml
val source: util.LinkedHashMap[String, Object] = yaml.load(input)

new JSONObject(source)
client.customResource(crdContext).delete(crdNamespace, crdInstanceName)
}
}
21 changes: 10 additions & 11 deletions src/main/scala/consumer/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ object KafkaConsumer extends LazyLogging {
private val config = ConfigFactory.load()
private val localKafkaUrl = config.getString("minikube.kafka.url")
private val groupId = config.getString("minikube.kafka.groupId")
private val tableName = config.getString("minikube.hive.tableName")

private val mssqlConfigMap = Map(
"url" -> config.getString("minikube.sqlserver.db.urlForSpark"),
Expand All @@ -43,7 +42,7 @@ object KafkaConsumer extends LazyLogging {
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val sparkConf: SparkConf = new SparkConf().setAppName("KafkaAvroToHDFSWriter")
val sparkConf: SparkConf = new SparkConf().setAppName("KafkaAvroToMSSQLSink")
val spark: SparkSession = SparkSession
.builder()
.config(sparkConf)
Expand Down Expand Up @@ -73,26 +72,26 @@ object KafkaConsumer extends LazyLogging {
.map(x => deserializeMsg(x.value))
.map(_.normalize())

normalizedData
.map(_._1)
.toDF
.write
.mode(SaveMode.Append)
.format("parquet")
.saveAsTable(tableName)
// normalizedData
// .map(_._1)
// .toDF
// .write
// .mode(SaveMode.Append)
// .format("parquet")
// .saveAsTable(tableName)

normalizedData
.map(_._1)
.toDF
.write
.mode(SaveMode.Append)
.mode(SaveMode.Overwrite)
.sqlDB(msgSinkConf)

normalizedData
.map(_._2)
.toDF
.write
.mode(SaveMode.Append)
.mode(SaveMode.Overwrite)
.sqlDB(userSinkConf)
}

Expand Down

0 comments on commit 143a0f1

Please sign in to comment.