Skip to content

Commit

Permalink
support structrued streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 27, 2017
1 parent 5da742a commit 8e0cd42
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 349 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>pentaho-releases</id>
<url>http://repository.pentaho.org/artifactory/repo/</url>
</repository>
</repositories>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ class DefaultShortNameMapping extends ShortNameMapping {
"stream.output.unittest" -> "streaming.core.compositor.spark.streaming.output.SQLUnitTestCompositor",
"stream.output.print" -> "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",

"ss.source" -> "streaming.core.compositor.spark.ss.source.SQLSourceCompositor",
"ss.source.mock" -> "streaming.core.compositor.spark.ss.source.MockSQLSourceCompositor",
"ss.sql" -> "streaming.core.compositor.spark.transformation.SQLCompositor",
"ss.table" -> "streaming.core.compositor.spark.transformation.JSONTableCompositor",
"ss.refTable" -> "streaming.core.compositor.spark.transformation.JSONRefTableCompositor",
"ss.script" -> "streaming.core.compositor.spark.transformation.ScriptCompositor",
"ss.output" -> "streaming.core.compositor.spark.ss.output.SQLOutputCompositor",
"ss.sources" -> "streaming.core.compositor.spark.ss.source.MultiSQLSourceCompositor",
"ss.sql" -> "streaming.core.compositor.spark.ss.transformation.SQLCompositor",
"ss.outputs" -> "streaming.core.compositor.spark.ss.output.MultiSQLOutputCompositor",

"flink.sources" -> "streaming.core.compositor.flink.streaming.source.MultiStreamingCompositor",
"flink.sql" -> "streaming.core.compositor.flink.streaming.transformation.SQLCompositor",
Expand Down
15 changes: 0 additions & 15 deletions streamingpro-spark-2.0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,6 @@
<scope>compile</scope>
</properties>
</profile>
<profile>
<id>carbondata</id>
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark</artifactId>
<version>1.0.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thrift-server</id>
<dependencies>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package streaming.core

import java.util

import streaming.core.common.SQLContextHolder

import org.apache.spark.sql.SparkSession
import scala.collection.JavaConversions._

/**
Expand All @@ -30,8 +29,8 @@ trait CompositorHelper {
sql
}

def sqlContextHolder(params: util.Map[Any, Any]) = {
params.get("_sqlContextHolder_").asInstanceOf[SQLContextHolder].getOrCreate()
def sparkSession(params: util.Map[Any, Any]) = {
params.get("_session_").asInstanceOf[SparkSession]
}


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package streaming.core.compositor.spark.ss.output

import java.util

import java.util.concurrent.TimeUnit
import org.apache.log4j.Logger
import org.apache.spark.sql.streaming.ProcessingTime
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.CompositorHelper
import streaming.core.strategy.ParamsValidator

import scala.collection.JavaConversions._

/**
* 5/11/16 WilliamZhu([email protected])
*/
class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator {

private var _configParams: util.List[util.Map[Any, Any]] = _
val logger = Logger.getLogger(classOf[MultiSQLOutputCompositor[T]].getName)


override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

def path = {
config[String]("path", _configParams)
}

def format = {
config[String]("format", _configParams)
}

def mode = {
config[String]("mode", _configParams)
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

val spark = sparkSession(params)
_configParams.foreach { config =>

try {
val name = config.getOrElse("name", "").toString
val _cfg = config.map(f => (f._1.toString, f._2.toString)).map { f =>
(f._1, params.getOrElse(s"streaming.sql.out.${name}.${f._1}", f._2).toString)
}.toMap

val tableName = _cfg("inputTableName")
val options = _cfg - "path" - "mode" - "format"
val _resource = _cfg("path")
val mode = _cfg.getOrElse("mode", "ErrorIfExists")
val format = _cfg("format")
val outputFileNum = _cfg.getOrElse("outputFileNum", "-1").toInt

val dbtable = if (options.containsKey("dbtable")) options("dbtable") else _resource


var newTableDF = spark.table(tableName)

if (outputFileNum != -1) {
newTableDF = newTableDF.repartition(outputFileNum)
}

val ssStream = newTableDF.writeStream

if (_cfg.containsKey("checkpoint")) {
val checkpointDir = _cfg.get("checkpoint").toString
ssStream.option("checkpointLocation", checkpointDir)
}

val query = ssStream.options(options).outputMode(mode).format(format)
query.trigger(ProcessingTime(_cfg.getOrElse("duration","10").toInt, TimeUnit.SECONDS)).start()

} catch {
case e: Exception => e.printStackTrace()
}

}
spark.streams.awaitAnyTermination()

params.remove("sql")
new util.ArrayList[T]()
}

override def valid(params: util.Map[Any, Any]): (Boolean, String) = {
(true, "")
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import streaming.core.strategy.platform.SparkStructuredStreamingRuntime
import scala.collection.JavaConversions._

/**
* 11/21/16 WilliamZhu([email protected])
*/
* 11/21/16 WilliamZhu([email protected])
*/
class MockSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _

Expand All @@ -24,17 +24,17 @@ class MockSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
}

def data = {
_configParams(0).map(f => f._2.asInstanceOf[JSONArray].map(k => k.asInstanceOf[String]).toSeq).toSeq
_configParams(1).map(f => f._2.asInstanceOf[JSONArray].map(k => k.asInstanceOf[String]).toSeq).toSeq
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
val sparkSSRt = params.get("_runtime_").asInstanceOf[SparkStructuredStreamingRuntime]
val ss = sparkSSRt.sparkSessionAdaptor.sparkSession
val ss = sparkSession(params)
import ss.implicits._
implicit val sqlContext = ss.sqlContext
val inputData = MemoryStream[String]
inputData.addData(data.flatMap(f => f).seq)
val df = inputData.toDS()
df.createOrReplaceTempView(_configParams(0)("outputTable").toString)
List(df.asInstanceOf[T])
}
}
Loading

0 comments on commit 8e0cd42

Please sign in to comment.