Skip to content

Commit

Permalink
add dynamic resource configure
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Apr 26, 2019
1 parent 223011c commit 94f974c
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object OperateType extends Enumeration {
val CREATE = Value("create")
val DROP = Value("drop")
val INSERT = Value("insert")
//val UPDATE = Value("update")
val SELECT = Value("select")
val SET = Value("set")
val EMPTY = Value("empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package streaming.dsl.mmlib.algs.param

import org.apache.spark.ml.param.{Param, ParamMap, Params}
import org.apache.spark.sql.mlsql.session.MLSQLException
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Expand All @@ -45,5 +46,29 @@ trait WowParams extends Params {
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rfcParams2, 1), StructType(Seq(StructField("param", StringType), StructField("description", StringType))))
}

def fetchParam[T](params: Map[String, String], param: Param[T], convert: (String) => T,
callback: Param[T] => Unit) = {
params.get(param.name).map { item =>
set(param, convert(item))
}.getOrElse {
callback(param)
}
$(param)
}

object ParamDefaultOption {
def required[T](param: Param[T]): Unit = {
throw new MLSQLException(s"${param.name} is required")
}
}

object ParamConvertOption {
def toInt(a: String): Int = {
a.toInt
}

def nothing(a: String) = a
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ object CommandCollection {
context.addEnv("cache", """ run {} as CacheExt.`` where lifeTime="{}" """)
context.addEnv("unCache", """ run {} as CacheExt.`` where execute="uncache" """)
context.addEnv("uncache", """ run {} as CacheExt.`` where execute="uncache" """)
context.addEnv("createPythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="create"; """)
context.addEnv("removePythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="remove"; """)
context.addEnv("createPythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="create" """)
context.addEnv("removePythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="remove" """)
context.addEnv("resource",""" run command as EngineResource.`` where action="{0}" and cpus="{1}" """)

context.addEnv("show",
"""
Expand Down
119 changes: 119 additions & 0 deletions streamingpro-mlsql/src/main/java/tech/mlsql/ets/EngineResource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package tech.mlsql.ets

import org.apache.spark.SparkConf
import org.apache.spark.ml.param.Param
import org.apache.spark.scheduler.cluster.{ResourceStatus, SparkDynamicControlExecutors, SparkInnerExecutors}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.mlsql.session.MLSQLException
import org.apache.spark.sql.{DataFrame, SparkSession}
import streaming.dsl.ScriptSQLExec
import streaming.dsl.auth._
import streaming.dsl.mmlib.SQLAlg
import streaming.dsl.mmlib.algs.Functions
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}


/**
* 2019-04-26 WilliamZhu([email protected])
*/
class EngineResource(override val uid: String) extends SQLAlg with Functions with WowParams {
def this() = this(BaseParams.randomUID())

override def batchPredict(df: DataFrame, path: String, params: Map[String, String]): DataFrame = train(df, path, params)

override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = {
val spark = df.sparkSession

val executorInfo = new SparkInnerExecutors(spark)
val resourceControl = new SparkDynamicControlExecutors(spark)

def isLocalMaster(conf: SparkConf): Boolean = {
// val master = MLSQLConf.MLSQL_MASTER.readFrom(configReader).getOrElse("")
val master = conf.get("spark.master", "")
master == "local" || master.startsWith("local[")
}

if (isLocalMaster(spark.sparkContext.getConf)) {
throw new MLSQLException("Local mode not support this action");
}

if (!params.contains(action.name) || params(action.name).isEmpty) {
import spark.implicits._
return spark.createDataset[ResourceStatus](Seq(executorInfo.status)).toDF()
}

val _action = fetchParam(params, action, ParamConvertOption.nothing, ParamDefaultOption.required[String])
val _cpus = parseCores(fetchParam[String](params, cpus, ParamConvertOption.nothing, ParamDefaultOption.required[String]))
val _timeout = fetchParam[Int](params, timeout, ParamConvertOption.toInt, (_) => {
set(timeout, 30 * 1000)
})

if (_cpus > 20) {
throw new MLSQLException("Too many cpus added at one time. Please add them with multi times.");
}


val context = ScriptSQLExec.contextGetOrForTest()
context.execListener.getTableAuth match {
case Some(tableAuth) =>
val vtable = MLSQLTable(
Option(DB_DEFAULT.MLSQL_SYSTEM.toString),
Option("__resource_allocate__"),
OperateType.INSERT,
Option("_mlsql_"),
TableType.SYSTEM)
tableAuth.auth(List(vtable))
case None =>
}

val executorsShouldAddOrRemove = Math.floor(_cpus / executorInfo.executorCores).toInt

parseAction(_action) match {
case Action.+ | Action.ADD =>
resourceControl.requestExecutors(executorsShouldAddOrRemove, _timeout)

case Action.- | Action.REMOVE =>
resourceControl.killExecutors(executorsShouldAddOrRemove, _timeout)
}

import spark.implicits._
spark.createDataset[ResourceStatus](Seq(executorInfo.status)).toDF()

}

def parseCores(str: String) = {
if (str.toLowerCase.endsWith("c")) {
str.toLowerCase.stripSuffix("c").toInt
} else {
str.toInt
}
}

def parseAction(str: String) = {
Action.withName(str)
}

override def skipPathPrefix: Boolean = true

override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = {
throw new MLSQLException(s"${getClass.getName} not support register ")
}

override def predict(sparkSession: SparkSession, _model: Any, name: String, params: Map[String, String]): UserDefinedFunction = {
throw new MLSQLException(s"${getClass.getName} not support register ")
}

object Action extends Enumeration {
type action = Value
val ADD = Value("add")
val REMOVE = Value("remove")
val + = Value("+")
val - = Value("-")
}


final val action: Param[String] = new Param[String](this, "action", "")
final val cpus: Param[String] = new Param[String](this, "cpus", "")
final val timeout: Param[Int] = new Param[Int](this, "timeout", "")

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ object ETRegister {
private def wow(name: String) = name -> ("tech.mlsql.ets." + name)

val mapping = Map[String, String](
wow("ShowCommand")
wow("ShowCommand"),
wow("EngineResource")
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import scala.collection.mutable.{Buffer, ListBuffer}
*/
class MLSQLResource(spark: SparkSession, owner: String, getGroupId: String => String) {


def resourceSummary(jobGroupId: String) = {
val store = MLSQLUtils.getAppStatusStore(spark)
val executorList = store.executorList(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class SparkInstanceService(session: SparkSession) {
}
SparkInstanceResource(totalCores.toLong, totalTasks, totalUsedMemory, totalMemory)
}

}

case class SparkInstanceResource(totalCores: Long, totalTasks: Long, totalUsedMemory: Long, totalMemory: Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.spark.scheduler.cluster

import net.csdn.common.reflect.ReflectHelper
import org.apache.spark.ExecutorAllocationClient
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.sql.SparkSession

/**
* 2019-04-26 WilliamZhu([email protected])
*/
class SparkInnerExecutors(session: SparkSession) {
def executorCores = {
val items = executorDataMap
if (items.size > 0) {
items.head._2.totalCores
} else {
java.lang.Runtime.getRuntime.availableProcessors
}
}

def executorDataMap = {
executorAllocationClient match {
case Some(eac) =>
val item = eac.asInstanceOf[CoarseGrainedSchedulerBackend]
val executors = ReflectHelper.field(item, "executorDataMap").asInstanceOf[Map[String, ExecutorData]]
executors
case None => Map[String, ExecutorData]()
}
}

def executorMemory = {
session.sparkContext.executorMemory
}

def executorAllocationClient = {
session.sparkContext.schedulerBackend match {
case sb if sb.isInstanceOf[CoarseGrainedSchedulerBackend] =>
Option(sb.asInstanceOf[ExecutorAllocationClient])
case sb if sb.isInstanceOf[LocalSchedulerBackend] =>
None
}
}

def status: ResourceStatus = {
val totalCores = executorDataMap.map(f => f._2.totalCores).sum
val totalMemory = executorDataMap.map(f => executorMemory).sum
val executorNum = executorDataMap.size
ResourceStatus(totalCores, totalMemory, executorNum)
}
}

case class ResourceStatus(totalCores: Int, totalMemory: Long, executorNum: Int)

class SparkDynamicControlExecutors(session: SparkSession) {
private[this] val sparkInnerExecutors = new SparkInnerExecutors(session)

private def changeExecutors(num: Int, timeout: Long, f: () => Unit) = {
val currentSize = sparkInnerExecutors.executorDataMap.size
val targetSize = currentSize + num
f()
var count = 0
var notSusscess = true
while (notSusscess && count < timeout / 1000) {
notSusscess = (currentSize != targetSize)
Thread.sleep(1000)
count += 1
}
if (count > timeout / 1000) {
throw new RuntimeException(
s"""
|Resource Info:
|
|current_executor_num: ${currentSize}
|target_executor_num: ${targetSize}
|
|Please check the status manually, maybe the cluster is too busy and we can not
|allocate/deallocate executors.
""".stripMargin)
}
}

def requestExecutors(num: Int, timeout: Long) = {
changeExecutors(num, timeout, () => {
session.sparkContext.requestExecutors(num)
})
}

def killExecutors(num: Int, timeout: Long) = {
val items = sparkInnerExecutors.executorDataMap.keys.take(num)
changeExecutors(num, timeout, () => {
session.sparkContext.killExecutors(items.toSeq)
})

}
}

0 comments on commit 94f974c

Please sign in to comment.