From 94f974c9d5b025337605d3298dd936cc60b0b664 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Fri, 26 Apr 2019 12:12:04 +0800 Subject: [PATCH] add dynamic resource configure --- .../java/streaming/dsl/auth/Protocal.scala | 1 + .../dsl/mmlib/algs/param/WowParams.scala | 25 ++++ .../tech/mlsql/dsl/CommandCollection.scala | 5 +- .../java/tech/mlsql/ets/EngineResource.scala | 119 ++++++++++++++++++ .../tech/mlsql/ets/register/ETRegister.scala | 3 +- .../java/org/apache/spark/MLSQLResource.scala | 1 - .../apache/spark/SparkInstanceService.scala | 1 + .../cluster/SparkInnerExecutors.scala | 95 ++++++++++++++ 8 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 streamingpro-mlsql/src/main/java/tech/mlsql/ets/EngineResource.scala create mode 100644 streamingpro-spark-common/src/main/java/org/apache/spark/scheduler/cluster/SparkInnerExecutors.scala diff --git a/streamingpro-api/src/main/java/streaming/dsl/auth/Protocal.scala b/streamingpro-api/src/main/java/streaming/dsl/auth/Protocal.scala index 0ff79a222..c1f919049 100644 --- a/streamingpro-api/src/main/java/streaming/dsl/auth/Protocal.scala +++ b/streamingpro-api/src/main/java/streaming/dsl/auth/Protocal.scala @@ -75,6 +75,7 @@ object OperateType extends Enumeration { val CREATE = Value("create") val DROP = Value("drop") val INSERT = Value("insert") + //val UPDATE = Value("update") val SELECT = Value("select") val SET = Value("set") val EMPTY = Value("empty") diff --git a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/param/WowParams.scala b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/param/WowParams.scala index 5533ccb14..aaf9a3e01 100644 --- a/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/param/WowParams.scala +++ b/streamingpro-mlsql/src/main/java/streaming/dsl/mmlib/algs/param/WowParams.scala @@ -19,6 +19,7 @@ package streaming.dsl.mmlib.algs.param import org.apache.spark.ml.param.{Param, ParamMap, Params} +import org.apache.spark.sql.mlsql.session.MLSQLException import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} @@ -45,5 +46,29 @@ trait WowParams extends Params { sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rfcParams2, 1), StructType(Seq(StructField("param", StringType), StructField("description", StringType)))) } + def fetchParam[T](params: Map[String, String], param: Param[T], convert: (String) => T, + callback: Param[T] => Unit) = { + params.get(param.name).map { item => + set(param, convert(item)) + }.getOrElse { + callback(param) + } + $(param) + } + + object ParamDefaultOption { + def required[T](param: Param[T]): Unit = { + throw new MLSQLException(s"${param.name} is required") + } + } + + object ParamConvertOption { + def toInt(a: String): Int = { + a.toInt + } + + def nothing(a: String) = a + } + } diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/dsl/CommandCollection.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/dsl/CommandCollection.scala index 9ee641b1d..ab02ec5f1 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/dsl/CommandCollection.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/dsl/CommandCollection.scala @@ -20,8 +20,9 @@ object CommandCollection { context.addEnv("cache", """ run {} as CacheExt.`` where lifeTime="{}" """) context.addEnv("unCache", """ run {} as CacheExt.`` where execute="uncache" """) context.addEnv("uncache", """ run {} as CacheExt.`` where execute="uncache" """) - context.addEnv("createPythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="create"; """) - context.addEnv("removePythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="remove"; """) + context.addEnv("createPythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="create" """) + context.addEnv("removePythonEnv", """ run command as PythonEnvExt.`{}` where condaYamlFilePath="${HOME}/{}" and command="remove" """) + context.addEnv("resource",""" run command as EngineResource.`` where action="{0}" and cpus="{1}" """) context.addEnv("show", """ diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/EngineResource.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/EngineResource.scala new file mode 100644 index 000000000..7332984c8 --- /dev/null +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/EngineResource.scala @@ -0,0 +1,119 @@ +package tech.mlsql.ets + +import org.apache.spark.SparkConf +import org.apache.spark.ml.param.Param +import org.apache.spark.scheduler.cluster.{ResourceStatus, SparkDynamicControlExecutors, SparkInnerExecutors} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.mlsql.session.MLSQLException +import org.apache.spark.sql.{DataFrame, SparkSession} +import streaming.dsl.ScriptSQLExec +import streaming.dsl.auth._ +import streaming.dsl.mmlib.SQLAlg +import streaming.dsl.mmlib.algs.Functions +import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams} + + +/** + * 2019-04-26 WilliamZhu(allwefantasy@gmail.com) + */ +class EngineResource(override val uid: String) extends SQLAlg with Functions with WowParams { + def this() = this(BaseParams.randomUID()) + + override def batchPredict(df: DataFrame, path: String, params: Map[String, String]): DataFrame = train(df, path, params) + + override def train(df: DataFrame, path: String, params: Map[String, String]): DataFrame = { + val spark = df.sparkSession + + val executorInfo = new SparkInnerExecutors(spark) + val resourceControl = new SparkDynamicControlExecutors(spark) + + def isLocalMaster(conf: SparkConf): Boolean = { + // val master = MLSQLConf.MLSQL_MASTER.readFrom(configReader).getOrElse("") + val master = conf.get("spark.master", "") + master == "local" || master.startsWith("local[") + } + + if (isLocalMaster(spark.sparkContext.getConf)) { + throw new MLSQLException("Local mode not support this action"); + } + + if (!params.contains(action.name) || params(action.name).isEmpty) { + import spark.implicits._ + return spark.createDataset[ResourceStatus](Seq(executorInfo.status)).toDF() + } + + val _action = fetchParam(params, action, ParamConvertOption.nothing, ParamDefaultOption.required[String]) + val _cpus = parseCores(fetchParam[String](params, cpus, ParamConvertOption.nothing, ParamDefaultOption.required[String])) + val _timeout = fetchParam[Int](params, timeout, ParamConvertOption.toInt, (_) => { + set(timeout, 30 * 1000) + }) + + if (_cpus > 20) { + throw new MLSQLException("Too many cpus added at one time. Please add them with multi times."); + } + + + val context = ScriptSQLExec.contextGetOrForTest() + context.execListener.getTableAuth match { + case Some(tableAuth) => + val vtable = MLSQLTable( + Option(DB_DEFAULT.MLSQL_SYSTEM.toString), + Option("__resource_allocate__"), + OperateType.INSERT, + Option("_mlsql_"), + TableType.SYSTEM) + tableAuth.auth(List(vtable)) + case None => + } + + val executorsShouldAddOrRemove = Math.floor(_cpus / executorInfo.executorCores).toInt + + parseAction(_action) match { + case Action.+ | Action.ADD => + resourceControl.requestExecutors(executorsShouldAddOrRemove, _timeout) + + case Action.- | Action.REMOVE => + resourceControl.killExecutors(executorsShouldAddOrRemove, _timeout) + } + + import spark.implicits._ + spark.createDataset[ResourceStatus](Seq(executorInfo.status)).toDF() + + } + + def parseCores(str: String) = { + if (str.toLowerCase.endsWith("c")) { + str.toLowerCase.stripSuffix("c").toInt + } else { + str.toInt + } + } + + def parseAction(str: String) = { + Action.withName(str) + } + + override def skipPathPrefix: Boolean = true + + override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = { + throw new MLSQLException(s"${getClass.getName} not support register ") + } + + override def predict(sparkSession: SparkSession, _model: Any, name: String, params: Map[String, String]): UserDefinedFunction = { + throw new MLSQLException(s"${getClass.getName} not support register ") + } + + object Action extends Enumeration { + type action = Value + val ADD = Value("add") + val REMOVE = Value("remove") + val + = Value("+") + val - = Value("-") + } + + + final val action: Param[String] = new Param[String](this, "action", "") + final val cpus: Param[String] = new Param[String](this, "cpus", "") + final val timeout: Param[Int] = new Param[Int](this, "timeout", "") + +} diff --git a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/register/ETRegister.scala b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/register/ETRegister.scala index 79a03630e..51e38ad5c 100644 --- a/streamingpro-mlsql/src/main/java/tech/mlsql/ets/register/ETRegister.scala +++ b/streamingpro-mlsql/src/main/java/tech/mlsql/ets/register/ETRegister.scala @@ -7,7 +7,8 @@ object ETRegister { private def wow(name: String) = name -> ("tech.mlsql.ets." + name) val mapping = Map[String, String]( - wow("ShowCommand") + wow("ShowCommand"), + wow("EngineResource") ) } diff --git a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/MLSQLResource.scala b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/MLSQLResource.scala index 1c8f3f5cb..781560ab7 100644 --- a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/MLSQLResource.scala +++ b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/MLSQLResource.scala @@ -11,7 +11,6 @@ import scala.collection.mutable.{Buffer, ListBuffer} */ class MLSQLResource(spark: SparkSession, owner: String, getGroupId: String => String) { - def resourceSummary(jobGroupId: String) = { val store = MLSQLUtils.getAppStatusStore(spark) val executorList = store.executorList(true) diff --git a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/SparkInstanceService.scala b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/SparkInstanceService.scala index cc92b1880..80b6e0e84 100644 --- a/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/SparkInstanceService.scala +++ b/streamingpro-spark-2.4.0-adaptor/src/main/java/org/apache/spark/SparkInstanceService.scala @@ -49,6 +49,7 @@ class SparkInstanceService(session: SparkSession) { } SparkInstanceResource(totalCores.toLong, totalTasks, totalUsedMemory, totalMemory) } + } case class SparkInstanceResource(totalCores: Long, totalTasks: Long, totalUsedMemory: Long, totalMemory: Long) diff --git a/streamingpro-spark-common/src/main/java/org/apache/spark/scheduler/cluster/SparkInnerExecutors.scala b/streamingpro-spark-common/src/main/java/org/apache/spark/scheduler/cluster/SparkInnerExecutors.scala new file mode 100644 index 000000000..6cf1e4972 --- /dev/null +++ b/streamingpro-spark-common/src/main/java/org/apache/spark/scheduler/cluster/SparkInnerExecutors.scala @@ -0,0 +1,95 @@ +package org.apache.spark.scheduler.cluster + +import net.csdn.common.reflect.ReflectHelper +import org.apache.spark.ExecutorAllocationClient +import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.sql.SparkSession + +/** + * 2019-04-26 WilliamZhu(allwefantasy@gmail.com) + */ +class SparkInnerExecutors(session: SparkSession) { + def executorCores = { + val items = executorDataMap + if (items.size > 0) { + items.head._2.totalCores + } else { + java.lang.Runtime.getRuntime.availableProcessors + } + } + + def executorDataMap = { + executorAllocationClient match { + case Some(eac) => + val item = eac.asInstanceOf[CoarseGrainedSchedulerBackend] + val executors = ReflectHelper.field(item, "executorDataMap").asInstanceOf[Map[String, ExecutorData]] + executors + case None => Map[String, ExecutorData]() + } + } + + def executorMemory = { + session.sparkContext.executorMemory + } + + def executorAllocationClient = { + session.sparkContext.schedulerBackend match { + case sb if sb.isInstanceOf[CoarseGrainedSchedulerBackend] => + Option(sb.asInstanceOf[ExecutorAllocationClient]) + case sb if sb.isInstanceOf[LocalSchedulerBackend] => + None + } + } + + def status: ResourceStatus = { + val totalCores = executorDataMap.map(f => f._2.totalCores).sum + val totalMemory = executorDataMap.map(f => executorMemory).sum + val executorNum = executorDataMap.size + ResourceStatus(totalCores, totalMemory, executorNum) + } +} + +case class ResourceStatus(totalCores: Int, totalMemory: Long, executorNum: Int) + +class SparkDynamicControlExecutors(session: SparkSession) { + private[this] val sparkInnerExecutors = new SparkInnerExecutors(session) + + private def changeExecutors(num: Int, timeout: Long, f: () => Unit) = { + val currentSize = sparkInnerExecutors.executorDataMap.size + val targetSize = currentSize + num + f() + var count = 0 + var notSusscess = true + while (notSusscess && count < timeout / 1000) { + notSusscess = (currentSize != targetSize) + Thread.sleep(1000) + count += 1 + } + if (count > timeout / 1000) { + throw new RuntimeException( + s""" + |Resource Info: + | + |current_executor_num: ${currentSize} + |target_executor_num: ${targetSize} + | + |Please check the status manually, maybe the cluster is too busy and we can not + |allocate/deallocate executors. + """.stripMargin) + } + } + + def requestExecutors(num: Int, timeout: Long) = { + changeExecutors(num, timeout, () => { + session.sparkContext.requestExecutors(num) + }) + } + + def killExecutors(num: Int, timeout: Long) = { + val items = sparkInnerExecutors.executorDataMap.keys.take(num) + changeExecutors(num, timeout, () => { + session.sparkContext.killExecutors(items.toSeq) + }) + + } +}