Skip to content

Commit

Permalink
add rest api support in spark 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 30, 2017
1 parent ec0a51e commit bbd0ed0
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,5 @@ project/plugins/project/
/src/main/resources-dev/mapred-site.xml
/src/main/resources-dev/strategy.v2.json
/lib/

/metastore_db
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.spark

import java.util.{Map => JMap}

import org.apache.spark.sql.SparkSession

/**
* Created by allwefantasy on 30/3/2017.
*/
class SparkRuntimeOperator(sparkSession: SparkSession) {

def createTable(resource: String, tableName: String, dataSourceOptions: Map[String, String]): Unit = {

//val esOptions = Map("es.nodes"->"192.168.1.2,192.168.1.3", "es.scroll.size"->"1000", "es.field.read.as.array.include"->"SampleField")
//"org.elasticsearch.spark.sql"
var loader_clzz = dataSourceOptions("loader_clzz." + tableName)



val options = if (loader_clzz == "carbondata") {
dataSourceOptions + ("tableName" -> resource)
} else {
if (dataSourceOptions.contains("path") || dataSourceOptions.contains("paths")) {
dataSourceOptions
} else {
dataSourceOptions + ("path" -> resource)
}

}

if (loader_clzz == "carbondata") {
loader_clzz = "org.apache.spark.sql.CarbonSource"
}

val df = sparkSession.
read.format(loader_clzz).
options(options - loader_clzz - ("loader_clzz." + tableName)).
load()

df.createOrReplaceTempView(tableName)
}

def runSQL(sql: String) = {
val df = sparkSession.sql(sql)
df.toJSON.collect()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package streaming.core.strategy.platform
import java.util.concurrent.atomic.AtomicReference
import java.util.{Map => JMap}

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkRuntimeOperator}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

Expand All @@ -18,7 +18,10 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platf

var sparkSession: SparkSession = createRuntime

def operator = null

def operator = {
new SparkRuntimeOperator(sparkSession)
}

def createRuntime = {
val conf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,84 @@
package streaming.rest

import net.csdn.annotation.rest.At
import net.csdn.common.collections.WowCollections
import net.csdn.common.exception.RenderFinish
import net.csdn.modules.http.ApplicationController
import net.csdn.modules.http.{ApplicationController, ViewType}
import net.csdn.modules.http.RestRequest.Method._
import org.apache.spark.sql.execution.streaming.{ForeachHttpSink, ForeachSink, SQLExecute}
import streaming.core.strategy.platform.{PlatformManager, SparkStructuredStreamingRuntime}
import org.apache.spark.sql.execution.streaming.{SQLExecute}
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime, SparkStructuredStreamingRuntime}
import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 28/3/2017.
*/
class RestController extends ApplicationController {
@At(path = Array("/run/sql"), types = Array(GET,POST))
@At(path = Array("/ss/run/sql"), types = Array(GET, POST))
def ss = {
if (!runtime.isInstanceOf[SparkStructuredStreamingRuntime]) {
render(400, "runtime should be spark_structured_streaming")
}

val spark = runtime.asInstanceOf[SparkStructuredStreamingRuntime].sparkSession

new SQLExecute(spark,restResponse).query(param("sql"))
new SQLExecute(spark, restResponse).query(param("sql"))
throw new RenderFinish()
}


@At(path = Array("/runtime/spark/sql"), types = Array(GET, POST))
def sql = {
if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application")
val sparkRuntime = runtime.asInstanceOf[SparkRuntime]
val sparkSession = sparkRuntime.sparkSession
val tableToPaths = params().filter(f => f._1.startsWith("tableName.")).map(table => (table._1.split("\\.").last, table._2))

tableToPaths.foreach { tableToPath =>
val tableName = tableToPath._1
val loaderClzz = params.filter(f => f._1 == s"loader_clzz.${tableName}").head
val newParams = params.filter(f => f._1.startsWith(s"loader_param.${tableName}.")).map { f =>
val coms = f._1.split("\\.")
val paramStr = coms.takeRight(coms.length - 2).mkString(".")
(paramStr, f._2)
}.toMap + loaderClzz

if (!sparkSession.catalog.tableExists(tableToPath._1) || paramAsBoolean("forceCreateTable", false)) {
sparkRuntime.operator.createTable(tableToPath._2, tableToPath._1, newParams)
}
}

val sql = if (param("sql").contains(" limit ")) param("sql") else param("sql") + " limit 1000"
val result = sparkRuntime.operator.runSQL(sql).mkString(",")

param("resultType", "html") match {
case "json" => render(200, "[" + result + "]", ViewType.json)
case _ => renderHtml(200, "/rest/sqlui-result.vm", WowCollections.map("feeds", result))
}
}

@At(path = Array("/run/sql"), types = Array(GET, POST))
def ddlSql = {
val sparkSession = runtime.asInstanceOf[SparkRuntime].sparkSession
val res = sparkSession.sql(param("sql")).toJSON.collect().mkString(",")
render("[" + res + "]")
}

@At(path = Array("/table/create"), types = Array(GET, POST))
def tableCreate = {
val sparkRuntime = runtime.asInstanceOf[SparkRuntime]
if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application")

try {
sparkRuntime.operator.createTable(param("tableName"), param("tableName"), params().toMap)
} catch {
case e: Exception =>
e.printStackTrace()
render(e.getMessage)
}

render("register success")

}

def runtime = PlatformManager.getRuntime
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.spark.streaming

import org.scalatest.{FlatSpec, Matchers}
import serviceframework.dispatcher.{Compositor, StrategyDispatcher}
import streaming.common.ParamsUtil
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime}

/**
* Created by allwefantasy on 30/3/2017.
*/
class BasicSparkOperation extends FlatSpec with Matchers {

def withBatchContext[R](runtime: SparkRuntime)(block: SparkRuntime => R): R = {
try {
block(runtime)
} finally {
try {
StrategyDispatcher.clear
PlatformManager.clear
runtime.destroyRuntime(false, true)
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}

def getCompositorParam(item: Compositor[_]) = {
val field = item.getClass.getDeclaredField("_configParams")
field.setAccessible(true)
field.get(item).asInstanceOf[java.util.List[java.util.Map[Any, Any]]]
}

def setupBatchContext(batchParams: Array[String], configFilePath: String) = {
val extraParam = Array("-streaming.job.file.path", configFilePath)
val params = new ParamsUtil(batchParams ++ extraParam)
PlatformManager.getOrCreate.run(params, false)
val runtime = PlatformManager.getRuntime.asInstanceOf[SparkRuntime]
runtime
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package streaming.core

import java.nio.charset.Charset

import com.google.common.io.Files
import org.apache.spark.streaming.BasicSparkOperation
import streaming.core.compositor.spark.output.MultiSQLOutputCompositor
import streaming.core.strategy.platform.SparkRuntime

import scala.collection.JavaConversions._

/**
* Created by allwefantasy on 30/3/2017.
*/
class BatchSpec extends BasicSparkOperation {


val batchParams = Array(
"-streaming.master", "local[2]",
"-streaming.name", "unit-test",
"-streaming.rest", "false",
"-streaming.platform", "spark",
"-streaming.enableHiveSupport", "false",
"-streaming.spark.service", "false",
"-streaming.unittest", "true"
)


"batch" should "run normally" in {
val file = new java.io.File("/tmp/hdfsfile/abc.txt")
Files.createParentDirs(file)
Files.write(s""" {"abc":"123","bbc":"adkfj"} """, file, Charset.forName("utf-8"))

withBatchContext(setupBatchContext(batchParams, "classpath:///test/batch-console.json")) { runtime: SparkRuntime =>

val sd = Dispatcher.dispatcher(null)
val strategies = sd.findStrategies("batch-console").get
strategies.size should be(1)

val output = strategies.head.compositor.last.asInstanceOf[MultiSQLOutputCompositor[_]]
file.delete()

}
}
}

0 comments on commit bbd0ed0

Please sign in to comment.