From 22d95f566eb12e87254191bb651268276465f35c Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Mon, 20 Feb 2017 23:20:44 +0800 Subject: [PATCH] hive-thrift-server-support --- pom.xml | 67 +++++---- .../apache/spark/SparkRuntimeOperator.scala | 13 +- .../hive/thriftserver/HiveThriftServer3.scala | 132 ++++++++++++++++++ .../streaming/common/SQLContextHolder.scala | 4 + .../streaming/core/LocalSparkServiceApp.scala | 6 +- .../strategy/platform/PlatformManager.scala | 31 +++- .../java/streaming/rest/RestController.scala | 26 ++++ .../streaming/core/compositor/JDBCTest.scala | 37 +++++ 8 files changed, 280 insertions(+), 36 deletions(-) create mode 100644 src/main/java/org/apache/spark/sql/hive/thriftserver/HiveThriftServer3.scala create mode 100644 src/test/java/streaming/core/compositor/JDBCTest.scala diff --git a/pom.xml b/pom.xml index 83e687ce7..e1166834f 100644 --- a/pom.xml +++ b/pom.xml @@ -165,7 +165,7 @@ - debug + local true @@ -237,33 +237,44 @@ + + hive-thrift-server + + + org.apache.spark + spark-hive-thriftserver_2.10 + ${spark.version} + + + build-distr - - - - org.apache.maven.plugins - maven-assembly-plugin - 2.3 - - false - - jar-with-dependencies - - - - - make-assembly - package - - assembly - - - - - - + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + package + + shade + + + + + + + + + + + + + + + + @@ -503,6 +514,12 @@ test + + mysql + mysql-connector-java + 5.1.22 + + diff --git a/src/main/java/org/apache/spark/SparkRuntimeOperator.scala b/src/main/java/org/apache/spark/SparkRuntimeOperator.scala index 41c7d4951..0c872ffe6 100644 --- a/src/main/java/org/apache/spark/SparkRuntimeOperator.scala +++ b/src/main/java/org/apache/spark/SparkRuntimeOperator.scala @@ -6,8 +6,8 @@ import _root_.streaming.common.SQLContextHolder import _root_.streaming.core.strategy.platform.RuntimeOperator /** - * 5/11/16 WilliamZhu(allwefantasy@gmail.com) - */ + * 5/11/16 WilliamZhu(allwefantasy@gmail.com) + */ class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) extends RuntimeOperator { def createTable(resource: String, tableName: String, dataSourceOptions: Map[String, String]) = { @@ -20,7 +20,12 @@ class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) e val options = if (loader_clzz == "carbondata") { dataSourceOptions + ("tableName" -> resource) } else { - dataSourceOptions + ("path" -> resource) + if (dataSourceOptions.contains("path") || dataSourceOptions.contains("paths")) { + dataSourceOptions + } else { + dataSourceOptions + ("path" -> resource) + } + } if (loader_clzz == "carbondata") { @@ -29,7 +34,7 @@ class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) e val df = SQLContextHolder.getOrCreate.getOrCreate(). read.format(loader_clzz). - options(options - loader_clzz). + options(options - loader_clzz - ("loader_clzz." + tableName)). load() df.registerTempTable(tableName) diff --git a/src/main/java/org/apache/spark/sql/hive/thriftserver/HiveThriftServer3.scala b/src/main/java/org/apache/spark/sql/hive/thriftserver/HiveThriftServer3.scala new file mode 100644 index 000000000..20a15a00b --- /dev/null +++ b/src/main/java/org/apache/spark/sql/hive/thriftserver/HiveThriftServer3.scala @@ -0,0 +1,132 @@ +package org.apache.spark.sql.hive.thriftserver + +import java.io.PrintStream +import java.util.Locale +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import org.apache.commons.logging.LogFactory +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} +import org.apache.hive.service.server.{HiveServer2, HiveServerServerOptionsProcessor} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart, StatsReportListener} +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.HiveThriftServer2Listener +import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab +import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Created by allwefantasy on 20/2/2017. + */ +object HiveThriftServer3 extends Logging { + var LOG = LogFactory.getLog(classOf[HiveServer2]) + var uiTab: Option[ThriftServerTab] = _ + var listener: HiveThriftServer2Listener = _ + + /** + * :: DeveloperApi :: + * Starts a new thrift server with the given context. + */ + @DeveloperApi + def startWithContext(sqlContext: HiveContext): Unit = { + val server = new HiveThriftServer3(sqlContext) + server.init(sqlContext.hiveconf) + server.start() + listener = new HiveThriftServer2Listener(server, sqlContext.conf) + sqlContext.sparkContext.addSparkListener(listener) + uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { + Some(new ThriftServerTab(sqlContext.sparkContext)) + } else { + None + } + } + + def run(hiveContext: HiveContext) { + + hiveContext.sparkContext.addSparkListener(new StatsReportListener()) + hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) + hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) + hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) + hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) + + SparkSQLEnv.sparkContext = hiveContext.sparkContext + SparkSQLEnv.hiveContext = hiveContext + + ShutdownHookManager.addShutdownHook { () => + SparkSQLEnv.stop() + uiTab.foreach(_.detach()) + } + + try { + val server = new HiveThriftServer3(SparkSQLEnv.hiveContext) + server.init(SparkSQLEnv.hiveContext.hiveconf) + server.start() + logInfo("HiveThriftServer2 started") + listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf) + HiveThriftServer2.listener = listener + SparkSQLEnv.sparkContext.addSparkListener(listener) + uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { + Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) + } else { + None + } + // If application was killed before HiveThriftServer2 start successfully then SparkSubmit + // process can not exit, so check whether if SparkContext was stopped. + if (SparkSQLEnv.sparkContext.stopped.get()) { + logError("SparkContext has stopped even if HiveServer2 has started, so exit") + System.exit(-1) + } + } catch { + case e: Exception => + logError("Error starting HiveThriftServer2", e) + System.exit(-1) + } + } + + private[hive] class HiveThriftServer3(hiveContext: HiveContext) + extends HiveServer2 + with ReflectedCompositeService { + // state is tracked internally so that the server only attempts to shut down if it successfully + // started, and then once only. + private val started = new AtomicBoolean(false) + + override def init(hiveConf: HiveConf) { + val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext) + setSuperField(this, "cliService", sparkSqlCliService) + addService(sparkSqlCliService) + + val thriftCliService = if (isHTTPTransportMode(hiveConf)) { + new ThriftHttpCLIService(sparkSqlCliService) + } else { + new ThriftBinaryCLIService(sparkSqlCliService) + } + + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + initCompositeService(hiveConf) + } + + private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { + val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) + transportMode.toLowerCase(Locale.ENGLISH).equals("http") + } + + + override def start(): Unit = { + super.start() + started.set(true) + } + + override def stop(): Unit = { + if (started.getAndSet(false)) { + super.stop() + } + } + } +} diff --git a/src/main/java/streaming/common/SQLContextHolder.scala b/src/main/java/streaming/common/SQLContextHolder.scala index 394f09e42..5f88536fd 100644 --- a/src/main/java/streaming/common/SQLContextHolder.scala +++ b/src/main/java/streaming/common/SQLContextHolder.scala @@ -1,5 +1,6 @@ package streaming.common +import java.io.PrintStream import java.util.concurrent.atomic.AtomicReference import org.apache.spark.SparkContext @@ -19,12 +20,15 @@ class SQLContextHolder(hiveEnable: Boolean, sparkContext: SparkContext, hiveOpti if (hiveContextRef.get() == null) { val hiveContext = hiveOption match { case Some(info) => + val hiveContext = Class.forName(info("className")). getConstructor(classOf[SparkContext],classOf[String],classOf[String]). newInstance(sparkContext,info("store"),info("meta")).asInstanceOf[HiveContext] + if(sparkContext.getConf.contains("spark.deploy.zookeeper.url")){ hiveContext.setConf("spark.deploy.zookeeper.url",sparkContext.getConf.get("spark.deploy.zookeeper.url")) } + hiveContext case None => new org.apache.spark.sql.hive.HiveContext(sparkContext) } diff --git a/src/main/java/streaming/core/LocalSparkServiceApp.scala b/src/main/java/streaming/core/LocalSparkServiceApp.scala index 0c8e8b2f8..76744d7bc 100644 --- a/src/main/java/streaming/core/LocalSparkServiceApp.scala +++ b/src/main/java/streaming/core/LocalSparkServiceApp.scala @@ -9,13 +9,15 @@ object LocalSparkServiceApp { "-streaming.master", "local[2]", "-streaming.name", "god", "-streaming.rest", "true", + "-streaming.thrift", "true", "-streaming.platform", "spark", "-streaming.job.file.path","classpath:///test/empty.json", -// "-streaming.enableHiveSupport", "false", + "-streaming.enableHiveSupport", "true", "-streaming.spark.service", "true" + // "-streaming.enableCarbonDataSupport", "true", // "-streaming.carbondata.store", "/tmp/carbondata/store", -// "-streaming.carbondata.meta", "/tmp/carbondata/meta" + // "-streaming.carbondata.meta", "/tmp/carbondata/meta" //"-streaming.sql.out.path","file:///tmp/test/pdate=20160809" //"-streaming.jobs","idf-compute" diff --git a/src/main/java/streaming/core/strategy/platform/PlatformManager.scala b/src/main/java/streaming/core/strategy/platform/PlatformManager.scala index 4328e1ea1..e493e97a1 100644 --- a/src/main/java/streaming/core/strategy/platform/PlatformManager.scala +++ b/src/main/java/streaming/core/strategy/platform/PlatformManager.scala @@ -6,7 +6,8 @@ import java.util.{List => JList, Map => JMap} import net.csdn.ServiceFramwork import net.csdn.bootstrap.Application import net.csdn.common.logging.Loggers -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, HiveThriftServer3} import serviceframework.dispatcher.StrategyDispatcher import streaming.common.zk.{ZKClient, ZkRegister} import streaming.common.{ParamsUtil, SQLContextHolder, SparkCompatibility} @@ -51,6 +52,10 @@ class PlatformManager { Application.main(Array()) } + def startThriftServer = { + HiveThriftServer3.run(SQLContextHolder.sqlContextHolder.hiveContextRef.get()) + } + def preCompile(runtime: StreamingRuntime) = { SparkCompatibility.preCompile(runtime) } @@ -104,6 +109,15 @@ class PlatformManager { if (params.getBooleanParam("streaming.rest", false) && !reRun) { startRestServer } + + if (params.getBooleanParam("streaming.thrift", false) + && !reRun + && params.getBooleanParam("streaming.enableHiveSupport",false) + && runtime.isInstanceOf[SparkRuntime] + ) { + startThriftServer + } + if (params.hasParam("streaming.zk.conf_root_dir") && !reRun) { registerToZk(params) } @@ -200,14 +214,21 @@ object PlatformManager { if (params.containsKey("streaming.enableCarbonDataSupport") && params.get("streaming.enableCarbonDataSupport").toString.toBoolean ) { + + val hiveOption = Map( + "className" -> "org.apache.spark.sql.CarbonContext", + "store" -> params.getOrElse("streaming.carbondata.store","").toString, + "meta" -> params.getOrElse("streaming.carbondata.meta","").toString + + ) new SQLContextHolder( - true, sc, Some(Map("className" -> "org.apache.spark.sql.CarbonContext", - "store" -> params.getOrElse("streaming.carbondata.store","").toString, - "meta" -> params.getOrElse("streaming.carbondata.meta","").toString))) + true, sc, Some(hiveOption)) + } else { + new SQLContextHolder( params.containsKey("streaming.enableHiveSupport") && - params.get("streaming.enableHiveSupport").toString.toBoolean, sc) + params.get("streaming.enableHiveSupport").toString.toBoolean, sc, None) } } diff --git a/src/main/java/streaming/rest/RestController.scala b/src/main/java/streaming/rest/RestController.scala index 6fb0881c0..56d5bbc1e 100644 --- a/src/main/java/streaming/rest/RestController.scala +++ b/src/main/java/streaming/rest/RestController.scala @@ -7,9 +7,11 @@ import net.csdn.common.collections.WowCollections import net.csdn.modules.http.RestRequest.Method._ import net.csdn.modules.http.{ApplicationController, ViewType} import net.sf.json.JSONObject +import streaming.common.SQLContextHolder import streaming.core.strategy.platform.{PlatformManager, SparkRuntime, SparkStreamingRuntime} import scala.collection.JavaConversions._ +import scala.util.parsing.json.JSONArray /** * 4/30/16 WilliamZhu(allwefantasy@gmail.com) @@ -45,6 +47,30 @@ class RestController extends ApplicationController with CSVRender { } } + @At(path = Array("/run/sql"), types = Array(GET, POST)) + def ddlSql = { + if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application") + val res = SQLContextHolder.getOrCreate.getOrCreate().sql(param("sql")).toJSON.collect().mkString(",") + render("[" + res + "]") + } + + @At(path = Array("/table/create"), types = Array(GET, POST)) + def tableCreate = { + val sparkRuntime = runtime.asInstanceOf[SparkRuntime] + if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application") + + try { + sparkRuntime.operator.createTable(param("tableName"), param("tableName"), params().toMap) + } catch { + case e: Exception => + e.printStackTrace() + render(e.getMessage) + } + + render("register success") + + } + @At(path = Array("/sql"), types = Array(GET, POST)) def hiveSql = { if (!runtime.isInstanceOf[SparkRuntime]) render(400, "only support spark application") diff --git a/src/test/java/streaming/core/compositor/JDBCTest.scala b/src/test/java/streaming/core/compositor/JDBCTest.scala new file mode 100644 index 000000000..6ff83f779 --- /dev/null +++ b/src/test/java/streaming/core/compositor/JDBCTest.scala @@ -0,0 +1,37 @@ +package streaming.core.compositor + +import java.sql.DriverManager + +import java.sql.Connection + +/** + * Created by allwefantasy on 20/2/2017. + */ +object ScalaJdbcConnectSelect { + + def main(args: Array[String]) { + // connect to the database named "mysql" on the localhost + val driver = "com.mysql.jdbc.Driver" + val url = "jdbc:hive2://localhost:10000/default" + + // there's probably a better way to do this + var connection:Connection = null + + try { + // make the connection + Class.forName(driver) + connection = DriverManager.getConnection(url) + + // create the statement, and run the select query + val statement = connection.createStatement() + val resultSet = statement.executeQuery("SELECT * FROM src") + while ( resultSet.next() ) { + println(" tp = "+ resultSet.getString("tp")) + } + } catch { + case e => e.printStackTrace + } + connection.close() + } + +}