Skip to content

Commit

Permalink
hive-thrift-server-support
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Feb 20, 2017
1 parent 1e8c79d commit 22d95f5
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 36 deletions.
67 changes: 42 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
</dependencies>
</profile>
<profile>
<id>debug</id>
<id>local</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
Expand Down Expand Up @@ -237,33 +237,44 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thrift-server</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</profile>

<profile>
<id>build-distr</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!--<configuration>-->
<!--<relocations>-->
<!--<relocation>-->
<!--<pattern>com.fasterxml</pattern>-->
<!--<shadedPattern>com.dxy.data.com.fasterxml</shadedPattern>-->
<!--</relocation>-->
<!--<relocation>-->
<!--<pattern>org.apache.http</pattern>-->
<!--<shadedPattern>com.dxy.data.org.apache.http</shadedPattern>-->
<!--</relocation>-->
<!--</relocations>-->
<!--</configuration>-->
</execution>
</executions>
</plugin>
</profile>

</profiles>
Expand Down Expand Up @@ -503,6 +514,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.22</version>
</dependency>

</dependencies>
<build>

Expand Down
13 changes: 9 additions & 4 deletions src/main/java/org/apache/spark/SparkRuntimeOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import _root_.streaming.common.SQLContextHolder
import _root_.streaming.core.strategy.platform.RuntimeOperator

/**
* 5/11/16 WilliamZhu([email protected])
*/
* 5/11/16 WilliamZhu([email protected])
*/
class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) extends RuntimeOperator {

def createTable(resource: String, tableName: String, dataSourceOptions: Map[String, String]) = {
Expand All @@ -20,7 +20,12 @@ class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) e
val options = if (loader_clzz == "carbondata") {
dataSourceOptions + ("tableName" -> resource)
} else {
dataSourceOptions + ("path" -> resource)
if (dataSourceOptions.contains("path") || dataSourceOptions.contains("paths")) {
dataSourceOptions
} else {
dataSourceOptions + ("path" -> resource)
}

}

if (loader_clzz == "carbondata") {
Expand All @@ -29,7 +34,7 @@ class SparkRuntimeOperator(params: JMap[Any, Any], sparkContext: SparkContext) e

val df = SQLContextHolder.getOrCreate.getOrCreate().
read.format(loader_clzz).
options(options - loader_clzz).
options(options - loader_clzz - ("loader_clzz." + tableName)).
load()

df.registerTempTable(tableName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.apache.spark.sql.hive.thriftserver

import java.io.PrintStream
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
import org.apache.hive.service.server.{HiveServer2, HiveServerServerOptionsProcessor}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart, StatsReportListener}
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.HiveThriftServer2Listener
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.{Logging, SparkContext}

/**
* Created by allwefantasy on 20/2/2017.
*/
object HiveThriftServer3 extends Logging {
var LOG = LogFactory.getLog(classOf[HiveServer2])
var uiTab: Option[ThriftServerTab] = _
var listener: HiveThriftServer2Listener = _

/**
* :: DeveloperApi ::
* Starts a new thrift server with the given context.
*/
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer3(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(sqlContext.sparkContext))
} else {
None
}
}

def run(hiveContext: HiveContext) {

hiveContext.sparkContext.addSparkListener(new StatsReportListener())
hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)

SparkSQLEnv.sparkContext = hiveContext.sparkContext
SparkSQLEnv.hiveContext = hiveContext

ShutdownHookManager.addShutdownHook { () =>
SparkSQLEnv.stop()
uiTab.foreach(_.detach())
}

try {
val server = new HiveThriftServer3(SparkSQLEnv.hiveContext)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
HiveThriftServer2.listener = listener
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
System.exit(-1)
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}

private[hive] class HiveThriftServer3(hiveContext: HiveContext)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
// started, and then once only.
private val started = new AtomicBoolean(false)

override def init(hiveConf: HiveConf) {
val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)

val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}

setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}

private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
transportMode.toLowerCase(Locale.ENGLISH).equals("http")
}


override def start(): Unit = {
super.start()
started.set(true)
}

override def stop(): Unit = {
if (started.getAndSet(false)) {
super.stop()
}
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/streaming/common/SQLContextHolder.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package streaming.common

import java.io.PrintStream
import java.util.concurrent.atomic.AtomicReference

import org.apache.spark.SparkContext
Expand All @@ -19,12 +20,15 @@ class SQLContextHolder(hiveEnable: Boolean, sparkContext: SparkContext, hiveOpti
if (hiveContextRef.get() == null) {
val hiveContext = hiveOption match {
case Some(info) =>

val hiveContext = Class.forName(info("className")).
getConstructor(classOf[SparkContext],classOf[String],classOf[String]).
newInstance(sparkContext,info("store"),info("meta")).asInstanceOf[HiveContext]

if(sparkContext.getConf.contains("spark.deploy.zookeeper.url")){
hiveContext.setConf("spark.deploy.zookeeper.url",sparkContext.getConf.get("spark.deploy.zookeeper.url"))
}

hiveContext
case None => new org.apache.spark.sql.hive.HiveContext(sparkContext)
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/streaming/core/LocalSparkServiceApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ object LocalSparkServiceApp {
"-streaming.master", "local[2]",
"-streaming.name", "god",
"-streaming.rest", "true",
"-streaming.thrift", "true",
"-streaming.platform", "spark",
"-streaming.job.file.path","classpath:///test/empty.json",
// "-streaming.enableHiveSupport", "false",
"-streaming.enableHiveSupport", "true",
"-streaming.spark.service", "true"

// "-streaming.enableCarbonDataSupport", "true",
// "-streaming.carbondata.store", "/tmp/carbondata/store",
// "-streaming.carbondata.meta", "/tmp/carbondata/meta"
// "-streaming.carbondata.meta", "/tmp/carbondata/meta"
//"-streaming.sql.out.path","file:///tmp/test/pdate=20160809"

//"-streaming.jobs","idf-compute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.util.{List => JList, Map => JMap}
import net.csdn.ServiceFramwork
import net.csdn.bootstrap.Application
import net.csdn.common.logging.Loggers
import org.apache.spark.SparkContext
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, HiveThriftServer3}
import serviceframework.dispatcher.StrategyDispatcher
import streaming.common.zk.{ZKClient, ZkRegister}
import streaming.common.{ParamsUtil, SQLContextHolder, SparkCompatibility}
Expand Down Expand Up @@ -51,6 +52,10 @@ class PlatformManager {
Application.main(Array())
}

def startThriftServer = {
HiveThriftServer3.run(SQLContextHolder.sqlContextHolder.hiveContextRef.get())
}

def preCompile(runtime: StreamingRuntime) = {
SparkCompatibility.preCompile(runtime)
}
Expand Down Expand Up @@ -104,6 +109,15 @@ class PlatformManager {
if (params.getBooleanParam("streaming.rest", false) && !reRun) {
startRestServer
}

if (params.getBooleanParam("streaming.thrift", false)
&& !reRun
&& params.getBooleanParam("streaming.enableHiveSupport",false)
&& runtime.isInstanceOf[SparkRuntime]
) {
startThriftServer
}

if (params.hasParam("streaming.zk.conf_root_dir") && !reRun) {
registerToZk(params)
}
Expand Down Expand Up @@ -200,14 +214,21 @@ object PlatformManager {
if (params.containsKey("streaming.enableCarbonDataSupport")
&& params.get("streaming.enableCarbonDataSupport").toString.toBoolean
) {

val hiveOption = Map(
"className" -> "org.apache.spark.sql.CarbonContext",
"store" -> params.getOrElse("streaming.carbondata.store","").toString,
"meta" -> params.getOrElse("streaming.carbondata.meta","").toString

)
new SQLContextHolder(
true, sc, Some(Map("className" -> "org.apache.spark.sql.CarbonContext",
"store" -> params.getOrElse("streaming.carbondata.store","").toString,
"meta" -> params.getOrElse("streaming.carbondata.meta","").toString)))
true, sc, Some(hiveOption))

} else {

new SQLContextHolder(
params.containsKey("streaming.enableHiveSupport") &&
params.get("streaming.enableHiveSupport").toString.toBoolean, sc)
params.get("streaming.enableHiveSupport").toString.toBoolean, sc, None)
}

}
Expand Down
Loading

0 comments on commit 22d95f5

Please sign in to comment.