Skip to content

Commit

Permalink
split streaming.ps.enable to stremaing.ps.local.enable and streaming.…
Browse files Browse the repository at this point in the history
…ps.cluster.enable
  • Loading branch information
allwefantasy committed Dec 13, 2018
1 parent 4f5caac commit 97b22ef
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package streaming.core.strategy.platform

import java.lang.reflect.Modifier
import java.util.concurrent.atomic.AtomicReference
import java.util.logging.Logger
import java.util.{Map => JMap}

import _root_.streaming.common.ScalaObjectReflect
import _root_.streaming.core.StreamingproJobManager
import _root_.streaming.dsl.mmlib.algs.bigdl.WowLoggerFilter
import _root_.streaming.log.Logging
import net.csdn.common.reflect.ReflectHelper
import org.apache.spark._
import org.apache.spark.ps.cluster.PSDriverBackend
Expand All @@ -20,9 +20,8 @@ import scala.collection.JavaConversions._
/**
* Created by allwefantasy on 30/3/2017.
*/
class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener {
class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with PlatformManagerListener with Logging {

val logger = Logger.getLogger(getClass.getName)
val configReader = MLSQLConf.createConfigReader(params.map(f => (f._1.toString, f._2.toString)))

def name = "SPARK"
Expand All @@ -49,7 +48,7 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo


def createRuntime = {
logger.info("create Runtime...")
logInfo("create Runtime...")

val conf = new SparkConf()
params.filter(f =>
Expand All @@ -76,12 +75,20 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo
conf.setIfMissing("spark.speculation", "false")
}

// if (MLSQLConf.MLSQL_PS_ENABLE.readFrom(configReader)) {
// if (!isLocalMaster(conf)) {
// logger.info("register worker.sink.pservice.class with org.apache.spark.ps.cluster.PSServiceSink")
// conf.set("spark.metrics.conf.executor.sink.pservice.class", "org.apache.spark.ps.cluster.PSServiceSink")
// }
// }
if (MLSQLConf.MLSQL_CLUSTER_PS_ENABLE.readFrom(configReader) && !isLocalMaster(conf)) {
logWarning(
s"""
|------------------------------------------------------------------------
|${MLSQLConf.MLSQL_CLUSTER_PS_ENABLE.key} is enabled by default. Please make sure
|you have the uber-jar of mlsql placed in --jars. Otherwise the executor will
|fail to start and the whole application will fails.
|------------------------------------------------------------------------
""".stripMargin)

logInfo("register worker.sink.pservice.class with org.apache.spark.ps.cluster.PSServiceSink")
conf.set("spark.metrics.conf.executor.sink.pservice.class", "org.apache.spark.ps.cluster.PSServiceSink")
}


// SQLDL4J.tm = SQLDL4J.init(isLocalMaster(conf))

Expand All @@ -90,7 +97,7 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo
def setHiveConnectionURL = {
val url = MLSQLConf.MLSQL_HIVE_CONNECTION.readFrom(configReader)
if (!url.isEmpty) {
logger.info("set hive javax.jdo.option.ConnectionURL=" + url)
logInfo("set hive javax.jdo.option.ConnectionURL=" + url)
sparkSession.config("javax.jdo.option.ConnectionURL", url)
}
}
Expand All @@ -104,12 +111,12 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo
val isCarbonDataEnabled = MLSQLConf.MLSQL_ENABLE_CARBONDATA_SUPPORT.readFrom(configReader) && checkCarbonDataCoreCompatibility

if (!checkCarbonDataCoreCompatibility) {
logger.warning(s"------- CarbonData do not support current version of spark [${SparkCoreVersion.exactVersion}], streaming.enableCarbonDataSupport will not take effect.--------")
logWarning(s"------- CarbonData do not support current version of spark [${SparkCoreVersion.exactVersion}], streaming.enableCarbonDataSupport will not take effect.--------")
}

val ss = if (isCarbonDataEnabled) {

logger.info("CarbonData enabled...")
logInfo("CarbonData enabled...")
setHiveConnectionURL
val carbonBuilder = Class.forName("org.apache.spark.sql.CarbonSession$CarbonBuilder").
getConstructor(classOf[SparkSession.Builder]).
Expand All @@ -134,17 +141,17 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo

// parameter server should be enabled by default

if (params.getOrDefault(MLSQLConf.MLSQL_PS_ENABLE.key, "true").toString.toBoolean && isLocalMaster(conf)) {
logger.info("start LocalPSSchedulerBackend")
if (MLSQLConf.MLSQL_LOCAL_PS_ENABLE.readFrom(configReader) && isLocalMaster(conf)) {
logInfo("start LocalPSSchedulerBackend")
localSchedulerBackend = new LocalPSSchedulerBackend(ss.sparkContext)
localSchedulerBackend.start()
}

// if (MLSQLConf.MLSQL_PS_ENABLE.readFrom(configReader) && !isLocalMaster(conf)) {
// logger.info("start PSDriverBackend")
// psDriverBackend = new PSDriverBackend(ss.sparkContext)
// psDriverBackend.start()
// }
if (MLSQLConf.MLSQL_CLUSTER_PS_ENABLE.readFrom(configReader) && !isLocalMaster(conf)) {
logInfo("start PSDriverBackend")
psDriverBackend = new PSDriverBackend(ss.sparkContext)
psDriverBackend.start()
}

if (MLSQLConf.MLSQL_DISABLE_SPARK_LOG.readFrom(configReader)) {
WowLoggerFilter.redirectSparkInfoLogs()
Expand All @@ -164,11 +171,11 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo


def registerUDF(clzz: String) = {
logger.info("register functions.....")
logInfo("register functions.....")
Class.forName(clzz).getMethods.foreach { f =>
try {
if (Modifier.isStatic(f.getModifiers)) {
logger.info(f.getName)
logInfo(f.getName)
f.invoke(null, sparkSession.udf)
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ object MLSQLConf {
| conf.setIfMissing("spark.speculation", "false")
""".stripMargin).booleanConf.createWithDefault(true)

val MLSQL_PS_ENABLE: ConfigEntry[Boolean] = MLSQLConfigBuilder("streaming.ps.enable").doc(
val MLSQL_LOCAL_PS_ENABLE: ConfigEntry[Boolean] = MLSQLConfigBuilder("streaming.ps.local.enable").doc(
"""
|MLSQL supports directly communicating with executor if you set this true.
""".stripMargin).booleanConf.createWithDefault(true)

val MLSQL_CLUSTER_PS_ENABLE: ConfigEntry[Boolean] = MLSQLConfigBuilder("streaming.ps.cluster.enable").doc(
"""
|MLSQL supports directly communicating with executor if you set this true.
""".stripMargin).booleanConf.createWithDefault(false)

val MLSQL_HIVE_CONNECTION = MLSQLConfigBuilder("streaming.hive.javax.jdo.option.ConnectionURL").doc(
"""
|Use this to configure `hive.javax.jdo.option.ConnectionURL`
Expand Down

0 comments on commit 97b22ef

Please sign in to comment.