Skip to content

Commit

Permalink
add sql,source,ouput in spark 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 30, 2017
1 parent 8d9c030 commit ec0a51e
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package streaming.core

/**
* Created by allwefantasy on 30/3/2017.
*/
object LocalSparkApp {
/*
mvn package -Ponline -Pcarbondata -Pbuild-distr -Phive-thrift-server -Pspark-1.6.1
*/
def main(args: Array[String]): Unit = {
StreamingApp.main(Array(
"-streaming.master", "local[2]",
"-streaming.name", "god",
"-streaming.rest", "false",
"-streaming.platform", "spark",
"-streaming.enableHiveSupport", "false",
"-streaming.spark.service", "false",
"-streaming.job.file.path","classpath:///test/batch-console.json"
))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package streaming.core.compositor.spark.output

import java.util
import java.util.Properties

import org.apache.log4j.Logger
import org.apache.spark.sql.SaveMode
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.CompositorHelper
import streaming.core.strategy.ParamsValidator

import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 30/3/2017.
*/
class MultiSQLOutputCompositor[T] extends Compositor[T] with CompositorHelper with ParamsValidator {

private var _configParams: util.List[util.Map[Any, Any]] = _
val logger = Logger.getLogger(classOf[MultiSQLOutputCompositor[T]].getName)


override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}


override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

_configParams.foreach { config =>

val name = config.getOrElse("name", "").toString
val _cfg = config.map(f => (f._1.toString, f._2.toString)).map { f =>
(f._1, params.getOrElse(s"streaming.sql.out.${name}.${f._1}", f._2).toString)
}.toMap

val tableName = _cfg("inputTableName")
val options = _cfg - "path" - "mode" - "format"
val _resource = _cfg("path")
val mode = _cfg.getOrElse("mode", "ErrorIfExists")
val format = _cfg("format")
val outputFileNum = _cfg.getOrElse("outputFileNum", "-1").toInt

val dbtable = if (options.containsKey("dbtable")) options("dbtable") else _resource

var newTableDF = sparkSession(params).table(tableName)

if (outputFileNum != -1) {
newTableDF = newTableDF.repartition(outputFileNum)
}

if (format == "console") {
newTableDF.show(_cfg.getOrElse("showNum", "100").toInt)
} else {
val tempDf = newTableDF.write.options(options).mode(SaveMode.valueOf(mode)).format(format)
if (_resource == "-" || _resource.isEmpty) {
tempDf.save()
} else tempDf.save(_resource)
}
}

new util.ArrayList[T]()
}

override def valid(params: util.Map[Any, Any]): (Boolean, String) = {
(true, "")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package streaming.core.compositor.spark.source

import java.util

import org.apache.log4j.Logger
import org.apache.spark.sql.SaveMode
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.CompositorHelper
import streaming.core.strategy.ParamsValidator

import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 30/3/2017.
*/
class MultiSQLSourceCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _

val logger = Logger.getLogger(classOf[MultiSQLSourceCompositor[T]].getName)

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

_configParams.foreach { sourceConfig =>
val name = sourceConfig.getOrElse("name", "").toString

val _cfg = sourceConfig.map(f => (f._1.toString, f._2.toString)).map { f =>
(f._1, params.getOrElse(s"streaming.sql.source.${name}.${f._1}", f._2).toString)
}.toMap

val sourcePath = _cfg("path")
val df = sparkSession(params).read.format(sourceConfig("format").toString).options(
(_cfg - "format" - "path" - "outputTable").map(f => (f._1.toString, f._2.toString))).load(sourcePath)
df.createOrReplaceTempView(_cfg("outputTable"))
}
List()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package streaming.core.compositor.spark.transformation

import java.util

import org.apache.log4j.Logger
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.CompositorHelper

/**
* 8/2/16 WilliamZhu([email protected])
*/
class RowNumberCompositor[T] extends Compositor[T] with CompositorHelper {
private var _configParams: util.List[util.Map[Any, Any]] = _
val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

def outputTableName = {
config[String]("outputTableName", _configParams)
}

def inputTableName = {
config[String]("inputTableName", _configParams)
}

def rankField = {
config[String]("rankField", _configParams)
}


override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

val context = sparkSession(params)
import org.apache.spark.sql.types.{LongType, StructField, StructType}

val _inputTableName = inputTableName.get
val _outputTableName = outputTableName.get
val _rankField = rankField.get

val table = context.table(_inputTableName)
val schema = table.schema
val rdd = table.rdd
val newSchema = new StructType(schema.fields ++ Array(StructField(_rankField, LongType)))

val newRowsWithScore = rdd.zipWithIndex().map { f =>
org.apache.spark.sql.Row.fromSeq(f._1.toSeq ++ Array(f._2))
}

context.createDataFrame(newRowsWithScore, newSchema).createOrReplaceTempView(_outputTableName)

middleResult

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package streaming.core.compositor.spark.transformation

import java.util

import org.apache.log4j.Logger
import serviceframework.dispatcher.{Compositor, Processor, Strategy}
import streaming.core.CompositorHelper

import scala.collection.JavaConversions._


/**
* Created by allwefantasy on 27/3/2017.
*/
class SQLCompositor[T] extends Compositor[T] with CompositorHelper {

private var _configParams: util.List[util.Map[Any, Any]] = _
val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
this._configParams = configParams
}

def sql = {
_configParams.get(0).get("sql") match {
case a: util.List[String] => Some(a.mkString(" "))
case a: String => Some(a)
case _ => None
}
}

def outputTableName = {
config[String]("outputTableName", _configParams)
}

override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {

require(sql.isDefined, "please set sql by variable `sql` in config file")
val _sql = translateSQL(sql.get, params)
val _outputTableName = outputTableName

val df = sparkSession(params).sql(_sql)
df.createOrReplaceTempView(_outputTableName.get)
middleResult
}
}

0 comments on commit ec0a51e

Please sign in to comment.