Skip to content

Commit

Permalink
support spark runime in spark 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 30, 2017
1 parent c9c7605 commit 8d9c030
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.spark.sql.execution.streaming

import net.csdn.modules.http.RestResponse
import org.apache.spark.internal.Logging
import org.apache.spark.sql._

/**
* Created by allwefantasy on 28/3/2017.
*/
class ForeachHttpSink(writer: RestResponse) extends Sink with Logging {


override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val result = data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema).toJSON.collect().mkString("")
writer.write(result)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.sql.execution.streaming

import net.csdn.modules.http.RestResponse
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}

/**
* Created by allwefantasy on 28/3/2017.
*/
class SQLExecute(spark: SparkSession, restResponse: RestResponse) {
def query(sql: String) = {
val ds = spark.sql(sql)
val sink = new ForeachHttpSink(restResponse)
val df = ds.toDF()
val streamQuery = spark.sessionState.streamingQueryManager.startQuery(
None,
None,
df,
sink,
OutputMode.Append(),useTempCheckpointLocation = true
)
streamQuery.awaitTermination(5000)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package streaming.core

/**
* Created by allwefantasy on 30/3/2017.
*/
object LocalSparkServiceApp {
def main(args: Array[String]): Unit = {
StreamingApp.main(Array(
"-streaming.master", "local[2]",
"-streaming.name", "god",
"-streaming.rest", "true",
"-streaming.thrift", "true",
"-streaming.platform", "spark",
"-streaming.job.file.path", "classpath:///test/empty.json",
"-streaming.enableHiveSupport", "true",
"-streaming.spark.service", "true",
"-streaming.enableCarbonDataSupport", "true",
"-spark.sql.hive.thriftServer.singleSession","true"
//"-streaming.sql.out.path","file:///tmp/test/pdate=20160809"

//"-streaming.jobs","idf-compute"
//"-streaming.sql.source.path","hdfs://m2:8020/data/raw/live-hls-formated/20160725/19/cdn148-16-52_2016072519.1469444764341"
//"-streaming.driver.port", "9005"
//"-streaming.zk.servers", "127.0.0.1",
//"-streaming.zk.conf_root_dir", "/streamingpro/jack"
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ object LocalStreamingApp {
"-streaming.duration", "10",
"-spark.sql.shuffle.partitions","1",
"-streaming.name", "god",
"-streaming.rest", "false"
,"-streaming.driver.port","9902",
"-streaming.rest", "true"
,"-streaming.driver.port","9003",
"-streaming.platform", "spark_structured_streaming",
"-streaming.job.file.path", "classpath:///test/ss-test.json"
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper wi
}
spark.streams.awaitAnyTermination()

params.remove("sql")
new util.ArrayList[T]()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package streaming.core.strategy.platform

import java.util.concurrent.atomic.AtomicReference
import java.util.{Map => JMap}

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 30/3/2017.
*/
class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener {

def name = "SPARK"

var sparkSession: SparkSession = createRuntime

def operator = null

def createRuntime = {
val conf = new SparkConf()
params.filter(f => f._1.toString.startsWith("spark.")).foreach { f =>
conf.set(f._1.toString, f._2.toString)
}

if (params.containsKey("streaming.master")) {
conf.setMaster(params.get("streaming.master").toString)
}

conf.setAppName(params.get("streaming.name").toString)

val sparkSession = SparkSession.builder().config(conf)
if (params.containsKey("streaming.enableHiveSupport") &&
params.get("streaming.enableHiveSupport").toString.toBoolean) {
sparkSession.enableHiveSupport()
}
sparkSession.getOrCreate()
}

params.put("_session_", sparkSession)


override def startRuntime: StreamingRuntime = {
this
}

override def awaitTermination: Unit = {
if (params.getOrElse("streaming.spark.service", false).toString.toBoolean) {
Thread.currentThread().join()
}
}


override def streamingRuntimeInfo: StreamingRuntimeInfo = null

override def destroyRuntime(stopGraceful: Boolean, stopContext: Boolean): Boolean = {
sparkSession.stop()
SparkRuntime.clearLastInstantiatedContext()
true
}


override def configureStreamingRuntimeInfo(streamingRuntimeInfo: StreamingRuntimeInfo): Unit = {}

override def resetRuntimeOperator(runtimeOperator: RuntimeOperator): Unit = {

}

override def params: JMap[Any, Any] = _params

override def processEvent(event: Event): Unit = {}

SparkRuntime.setLastInstantiatedContext(this)

override def startThriftServer: Unit = {
HiveThriftServer2.startWithContext(sparkSession.sqlContext)
}

override def startHttpServer: Unit = {}

}

object SparkRuntime {


private val INSTANTIATION_LOCK = new Object()

/**
* Reference to the last created SQLContext.
*/
@transient private val lastInstantiatedContext = new AtomicReference[SparkRuntime]()

/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
* This function can be used to create a singleton SQLContext object that can be shared across
* the JVM.
*/
def getOrCreate(params: JMap[Any, Any]): SparkRuntime = {
INSTANTIATION_LOCK.synchronized {
if (lastInstantiatedContext.get() == null) {
new SparkRuntime(params)
}
}
PlatformManager.getOrCreate.register(lastInstantiatedContext.get())
lastInstantiatedContext.get()
}

private[platform] def clearLastInstantiatedContext(): Unit = {
INSTANTIATION_LOCK.synchronized {
PlatformManager.getOrCreate.unRegister(lastInstantiatedContext.get())
lastInstantiatedContext.set(null)
}
}

private[platform] def setLastInstantiatedContext(sparkRuntime: SparkRuntime): Unit = {
INSTANTIATION_LOCK.synchronized {
lastInstantiatedContext.set(sparkRuntime)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package streaming.rest

import net.csdn.annotation.rest.At
import net.csdn.common.exception.RenderFinish
import net.csdn.modules.http.ApplicationController
import net.csdn.modules.http.RestRequest.Method._
import org.apache.spark.sql.execution.streaming.{ForeachHttpSink, ForeachSink, SQLExecute}
import streaming.core.strategy.platform.{PlatformManager, SparkStructuredStreamingRuntime}

/**
* Created by allwefantasy on 28/3/2017.
*/
class RestController extends ApplicationController {
@At(path = Array("/run/sql"), types = Array(GET,POST))
def ss = {
if (!runtime.isInstanceOf[SparkStructuredStreamingRuntime]) {
render(400, "runtime should be spark_structured_streaming")
}

val spark = runtime.asInstanceOf[SparkStructuredStreamingRuntime].sparkSession

new SQLExecute(spark,restResponse).query(param("sql"))
throw new RenderFinish()
}

def runtime = PlatformManager.getRuntime
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ object LocalStreamingApp {
"-streaming.duration", "10",
"-spark.sql.shuffle.partitions","1",
"-streaming.name", "god",
"-streaming.rest", "false"
,"-streaming.driver.port","9902",
"-streaming.platform", "spark_streaming",
"-streaming.sql.out.jack.user","root",
"-streaming.sql.out.jack.password","csdn.net",
"-streaming.sql.out.jack.url","jdbc:mysql://127.0.0.1/alarm_test?characterEncoding=utf8"
"-streaming.rest", "true"
,"-streaming.driver.port","9003",
"-streaming.platform", "spark_streaming"
//"-streaming.enableCarbonDataSupport", "true",
//"-streaming.carbondata.store", "/tmp/carbondata/store"
//"-streaming.carbondata.meta", "/tmp/carbondata/meta"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.common.SQLContextHolder
Expand Down Expand Up @@ -40,7 +42,11 @@ class MultiKafkaStreamingCompositor[T] extends Compositor[T] with CompositorHelp
}

private def isStreaming(params: util.Map[Any, Any]) = {
if (params.containsKey("topics")) true else false
if (params.containsKey("topics") ||
params.get("format") == "kafka" ||
params.get("format") == "socket"
) true
else false
}

private def getZk(params: util.Map[Any, Any]) = {
Expand All @@ -60,16 +66,28 @@ class MultiKafkaStreamingCompositor[T] extends Compositor[T] with CompositorHelp
}
val p = streamings.head

val kafkaStream = if (zkEnable(p)) {
val zk = getZk(p)
val groupId = getKafkaParams(p).get("groupId").get
val topics = getTopics(p).map(f => (f, 1)).toMap
KafkaUtils.createStream(ssc, zk, groupId, topics)
} else {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
getKafkaParams(p),
getTopics(p))
def createKafkaStream = {
if (zkEnable(p)) {
val zk = getZk(p)
val groupId = getKafkaParams(p).get("groupId").get
val topics = getTopics(p).map(f => (f, 1)).toMap
KafkaUtils.createStream(ssc, zk, groupId, topics)
} else {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
getKafkaParams(p),
getTopics(p))
}
}

val stream = p.getOrElse("format", "") match {
case "kafka" =>
createKafkaStream

case "socket" =>
ssc.socketTextStream(p("hostname").toString, p("port").toString.toInt, StorageLevel.MEMORY_AND_DISK).map(f => (null, f))
case _ =>
createKafkaStream
}

params.put("mainStream", (rdd: RDD[(String, String)]) => {
Expand All @@ -82,7 +100,7 @@ class MultiKafkaStreamingCompositor[T] extends Compositor[T] with CompositorHelp
_configParams.filterNot(p => isStreaming(p)).foreach { sourceConfig =>
val sqlContext = sqlContextHolder(params)

val name = sourceConfig.getOrElse("name","").toString
val name = sourceConfig.getOrElse("name", "").toString
val _cfg = sourceConfig.map(f => (f._1.toString, f._2.toString)).map { f =>
(f._1, params.getOrElse(s"streaming.sql.source.${name}.${f._1}", f._2).toString)
}.toMap
Expand All @@ -94,7 +112,7 @@ class MultiKafkaStreamingCompositor[T] extends Compositor[T] with CompositorHelp
df.registerTempTable(_cfg("outputTable"))
}

List(kafkaStream.asInstanceOf[T])
List(stream.asInstanceOf[T])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class RestController extends ApplicationController with CSVRender {

@At(path = Array("/run/sql"), types = Array(GET, POST))
def ddlSql = {
if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application")
val sqlContext = SQLContextHolder.getOrCreate.getOrCreate()
val res = sqlContext.sql(param("sql")).toJSON.collect().mkString(",")
render("[" + res + "]")
Expand Down
30 changes: 7 additions & 23 deletions streamingpro-spark/src/main/resources-local/strategy.v2.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,23 @@
"ref": [],
"compositor": [
{
"name": "batch.sources",
"name": "stream.sources.kafka",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "doctorIndex",
"header": "true"
},
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"format": "socket",
"outputTable": "test2",
"header": "true"
}
]
},
{
"name": "batch.row.index",
"params": [
{
"inputTableName": "doctorIndex",
"outputTableName": "doctorIndex2",
"rankField": "rank"
"hostname": "127.0.0.1",
"port": "9999"
}
]
},
{
"name": "batch.outputs",
"name": "stream.outputs",
"params": [
{
"format": "console",
"inputTableName": "doctorIndex2",
"path":"-"
"inputTableName": "test2",
"path": "-"
}
]
}
Expand Down

0 comments on commit 8d9c030

Please sign in to comment.