Skip to content

Commit

Permalink
adding structrued streaming support
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Nov 21, 2016
1 parent 2d1d8a3 commit 25c3da6
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 21 deletions.
46 changes: 33 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>streaming.king</groupId>
<artifactId>streamingpro</artifactId>
<version>0.4.1-SNAPSHOT-${mode}-${spark.version}</version>
<version>0.4.6-SNAPSHOT-${mode}-${spark.version}</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -26,30 +26,23 @@

<profiles>

<profile>
<id>spark-1.5.1</id>
<properties>
<spark.version>1.5.1</spark.version>
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
</properties>
</profile>

<profile>
<id>spark-1.6.1</id>
<!-- default -->
<properties>
<spark.version>1.6.1</spark.version>
<spark.streaming.kafka.artifactId>spark-streaming-kafka_2.10</spark.streaming.kafka.artifactId>
<spark.sql.kafka.artifactId>spark-streaming-kafka_2.10</spark.sql.kafka.artifactId>
<serviceframework.version>1.3.0-SNAPSHOT</serviceframework.version>
</properties>
</profile>

<profile>
<id>spark-2.0.0</id>
<id>spark-2.0.2</id>
<properties>
<spark.version>2.0.0</spark.version>
<spark.version>2.0.2</spark.version>
<spark.streaming.kafka.artifactId>spark-streaming-kafka-0-8_2.10</spark.streaming.kafka.artifactId>
<spark.sql.kafka.artifactId>spark-sql-kafka-0-10_2.10</spark.sql.kafka.artifactId>
<serviceframework.version>1.3.2-SNAPSHOT-9.2.16</serviceframework.version>
</properties>
</profile>
Expand Down Expand Up @@ -83,6 +76,13 @@
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
Expand Down Expand Up @@ -138,6 +138,11 @@
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
Expand Down Expand Up @@ -168,6 +173,11 @@
<artifactId>${spark.streaming.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>${spark.sql.kafka.artifactId}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
Expand Down Expand Up @@ -587,5 +597,15 @@
</plugins>
</build>


<distributionManagement>
<repository>
<id>releases</id>
<name>releases</name>
<url>http://mvn.letv.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://mvn.letv.com/nexus/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</project>
38 changes: 38 additions & 0 deletions src/main/java/streaming/common/DefaultShortNameMapping.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package streaming.common

import serviceframework.dispatcher.ShortNameMapping

/**
* 11/21/16 WilliamZhu([email protected])
*/

class DefaultShortNameMapping extends ShortNameMapping {
private val compositorNameMap: Map[String, String] = Map[String, String](
"spark" -> "streaming.core.strategy.SparkStreamingStrategy",

"batch.source" -> "streaming.core.compositor.spark.source.SQLSourceCompositor",
"batch.sql" -> "streaming.core.compositor.spark.transformation.SQLCompositor",
"batch.table" -> "streaming.core.compositor.spark.transformation.JSONTableCompositor",
"batch.refTable" -> "streaming.core.compositor.spark.transformation.JSONRefTableCompositor",

"stream.source.kafka" -> "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
"stream.sql" -> "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
"stream.table" -> "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
"stream.refTable" -> "streaming.core.compositor.spark.streaming.transformation.JSONRefTableCompositor",

"ss.source" -> "streaming.core.compositor.spark.ss.source.SQLSourceCompositor",
"ss.source.mock" -> "streaming.core.compositor.spark.ss.source.MockSQLSourceCompositor",
"ss.sql" -> "streaming.core.compositor.spark.transformation.SQLCompositor",
"ss.table" -> "streaming.core.compositor.spark.transformation.JSONTableCompositor",
"ss.refTable" -> "streaming.core.compositor.spark.transformation.JSONRefTableCompositor",
"ss.output" -> "streaming.core.compositor.spark.ss.output.SQLOutputCompositor"
)

override def forName(shortName: String): String = {
if (compositorNameMap.contains(shortName)) {
compositorNameMap(shortName)
} else {
shortName
}
}
}
9 changes: 6 additions & 3 deletions src/main/java/streaming/core/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package streaming.core
import java.util.{Map => JMap}

import serviceframework.dispatcher.StrategyDispatcher
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime, SparkStreamingRuntime}
import streaming.common.DefaultShortNameMapping
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, PlatformManager, SparkRuntime, SparkStreamingRuntime}

import scala.collection.JavaConversions._

Expand All @@ -12,12 +13,14 @@ import scala.collection.JavaConversions._
*/
object Dispatcher {
def dispatcher(contextParams: JMap[Any, Any]): StrategyDispatcher[Any] = {
val defaultShortNameMapping = new DefaultShortNameMapping()
if (contextParams!=null && contextParams.containsKey("streaming.job.file.path")) {
val runtime = contextParams.get("_runtime_")

val sparkContext = runtime match {
case s: SparkStreamingRuntime => s.streamingContext.sparkContext
case s2: SparkRuntime => s2.sparkContext
case s3: SparkStructuredStreamingRuntime => s3.sparkSession.sparkContext
}

val jobFilePath = contextParams.get("streaming.job.file.path").toString
Expand All @@ -34,9 +37,9 @@ object Dispatcher {
textFile(jobFilePath).collect().mkString("\n")
}

StrategyDispatcher.getOrCreate(jobConfigStr)
StrategyDispatcher.getOrCreate(jobConfigStr,defaultShortNameMapping)
} else {
StrategyDispatcher.getOrCreate(null)
StrategyDispatcher.getOrCreate(null,defaultShortNameMapping)
}

}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/streaming/core/LocalStreamingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ object LocalStreamingApp {
"-streaming.duration", "20",
"-streaming.name", "god",
"-streaming.rest", "false"
// ,"-streaming.job.file.path","hdfs://cdn237:8020/tmp/bb.yml"
,"-streaming.driver.port","9902"
,"-streaming.job.file.path","classpath:///test/ss-test.json"
,"-streaming.driver.port","9902",
"-streaming.platform", "ss",
"-streaming.checkpoint","file:///tmp/ss"
// ,"-streaming.testinputstream.offsetPath", "hdfs://cdn237:8020/tmp/localstreampingapp"
// ,"-streaming.spark.hadoop.fs.defaultFS","hdfs://cdn237:8020"
))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package streaming.core.compositor.spark.ss.output

import java.util
import java.util.concurrent.TimeUnit

import org.apache.log4j.Logger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.ProcessingTime
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.compositor.spark.streaming.CompositorHelper
import streaming.core.strategy.ParamsValidator

import scala.collection.JavaConversions._

/**
* 5/11/16 WilliamZhu([email protected])
*/
class SQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator {

private var _configParams: util.List[util.Map[Any, Any]] = _
val logger = Logger.getLogger(classOf[SQLOutputCompositor[T]].getName)


override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

def path = {
config[String]("path", _configParams)
}

def format = {
config[String]("format", _configParams)
}

def mode = {
config[String]("mode", _configParams)
}

def cfg = {
val _cfg = _configParams(0).map(f => (f._1.asInstanceOf[String], f._2.asInstanceOf[String])).toMap
_cfg - "path" - "mode" - "format"
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
val oldDf = middleResult.get(0).asInstanceOf[DataFrame]
val func = params.get("_func_").asInstanceOf[(DataFrame) => DataFrame]
val _resource = if (params.containsKey("streaming.sql.out.path")) Some(params("streaming.sql.out.path").toString) else path

val duration = params.getOrElse("streaming.duration", "10").toString.toInt
val _mode = if (mode.isDefined) mode.get else "append"


val _cfg = cfg
val _format = format.get

try {
val df = func(oldDf)
val query = df.writeStream
if (params.containsKey("streaming.checkpoint")) {
val checkpointDir = params.get("streaming.checkpoint").toString
query.option("checkpointLocation", checkpointDir)
}

_resource match {
case Some(p) => query.option("path", p)
case None =>
}

query.outputMode(_mode)

val rt = query.trigger(ProcessingTime(duration, TimeUnit.SECONDS)).options(_cfg).format(_format).start()
rt.awaitTermination()
} catch {
case e: Exception => e.printStackTrace()
}
params.remove("sql")
new util.ArrayList[T]()
}

override def valid(params: util.Map[Any, Any]): (Boolean, String) = {
(true, "")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package streaming.core.compositor.spark.ss.source

import java.util

import org.apache.log4j.Logger
import org.apache.spark.sql.execution.streaming.MemoryStream
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.compositor.spark.streaming.CompositorHelper
import streaming.core.strategy.platform.SparkStructuredStreamingRuntime

import scala.collection.JavaConversions._

/**
* 11/21/16 WilliamZhu([email protected])
*/
class MockSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _

val logger = Logger.getLogger(classOf[MockSQLSourceCompositor[T]].getName)

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
val sparkSSRt = sparkStructuredStreamingRuntime(params)
val sparkSession = sparkSSRt.sparkSession
import sparkSession.implicits._
implicit val sqlContext = sparkSession.sqlContext
val inputData = MemoryStream[Int]
inputData.addData(Seq(1, 2, 3, 4))
val df = inputData.toDS()
List(df.asInstanceOf[T])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package streaming.core.compositor.spark.ss.source

import java.util

import org.apache.log4j.Logger
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.compositor.spark.streaming.CompositorHelper
import streaming.core.strategy.platform.SparkStructuredStreamingRuntime

import scala.collection.JavaConversions._

/**
* 11/21/16 WilliamZhu([email protected])
*/
class SQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _

val logger = Logger.getLogger(classOf[SQLSourceCompositor[T]].getName)

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
val sparkSSRt = sparkRuntime(params).asInstanceOf[SparkStructuredStreamingRuntime]
val sourcePath = if (params.containsKey("streaming.sql.source.path")) params("streaming.sql.source.path").toString else _configParams(0)("path").toString
val df = sparkSSRt.sparkSession.readStream.format(_configParams(0)("format").toString).options(
(_configParams(0) - "format" - "path").map(f => (f._1.asInstanceOf[String], f._2.asInstanceOf[String])).toMap).load(sourcePath)
List(df.asInstanceOf[T])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package streaming.core.compositor.spark.streaming
import java.util

import streaming.common.SQLContextHolder
import streaming.core.strategy.platform.{SparkRuntime, SparkStreamingRuntime}
import streaming.core.strategy.platform.{SparkStructuredStreamingRuntime, SparkRuntime, SparkStreamingRuntime}

import scala.collection.JavaConversions._

Expand All @@ -30,10 +30,15 @@ trait CompositorHelper {
params.get("_runtime_").asInstanceOf[SparkRuntime]
}

def sparkStructuredStreamingRuntime(params: util.Map[Any, Any]) = {
params.get("_runtime_").asInstanceOf[SparkStructuredStreamingRuntime]
}

def sparkContext(params: util.Map[Any, Any]) = {
params.get("_runtime_") match {
case a: SparkStreamingRuntime => a.streamingContext.sparkContext
case b: SparkRuntime => b.sparkContext
case c: SparkStructuredStreamingRuntime => c.sparkSession.sparkContext
case _ => throw new RuntimeException("get _runtime_ fail")
}
}
Expand Down
Loading

0 comments on commit 25c3da6

Please sign in to comment.