Skip to content

Commit

Permalink
add FreeCoreBackendStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Dec 4, 2018
1 parent 06a903d commit 511b756
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ class APIController extends ApplicationController {
@At(path = Array("/run/script"), types = Array(GET, POST))
def runScript = {
val tags = param("tags", "")
//FreeCoreBackendStrategy|TaskLessBackendStrategy
val proxyStrategy = param("proxyStrategy", "TaskLessBackendStrategy")
val res = BackendService.execute(instance => {
instance.runScript(params().asScala.toMap)
}, tags)
}, tags, proxyStrategy)
if (!res.isDefined) {
render(500, map("msg", s"There are no backend with tags [${tags}]"))
}
Expand All @@ -69,9 +71,10 @@ class APIController extends ApplicationController {
@At(path = Array("/run/sql"), types = Array(GET, POST))
def runSQL = {
val tags = param("tags", "")
val proxyStrategy = param("proxyStrategy", "FreeCoreBackendStrategy")
val res = BackendService.execute(instance => {
instance.runSQL(params().asScala.toMap)
}, param("tags", ""))
}, param("tags", ""), proxyStrategy)
if (!res.isDefined) {
render(500, map("msg", s"There are no backend with tags [${tags}]"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ trait BackendService {

@At(path = Array("/run/sql"), types = Array(GET, POST))
def runSQL(params: Map[String, String]): HttpTransportService.SResponse

@At(path = Array("/instance/resource"), types = Array(GET, POST))
def instanceResource: HttpTransportService.SResponse
}

case class BackendCache(meta: Backend, instance: BackendService)
Expand All @@ -46,6 +49,10 @@ object BackendService {
items -- active_task_meta.keySet()
}

def backends = {
backendMetaCache.get(backend_meta_key)
}

def find(backend: Option[Backend]) = {
backend match {
case Some(i) => backendMetaCache.get(backend_meta_key).filter(f => f.meta == i).headOption
Expand All @@ -66,10 +73,14 @@ object BackendService {
})


def execute(f: BackendService => HttpTransportService.SResponse, tags: String) = {
def execute(f: BackendService => HttpTransportService.SResponse, tags: String, proxyStrategy: String) = {
val items = backendMetaCache.get(backend_meta_key)

val chooseProxy = new TaskLessBackendStrategy(tags)
val chooseProxy = proxyStrategy match {
case "FreeCoreBackendStrategy" => new TaskLessBackendStrategy(tags)
case "TaskLessBackendStrategy" => new FreeCoreBackendStrategy(tags)
case _ => new TaskLessBackendStrategy(tags)
}
val backendCache = chooseProxy.invoke(items)
backendCache match {
case Some(ins) =>
Expand All @@ -94,10 +105,10 @@ object BackendService {

class SResponseEnhance(x: java.util.List[HttpTransportService.SResponse]) {

def toBean[T](res: String)(implicit manifest: Manifest[T]): Option[T] = {
def toBean[T]()(implicit manifest: Manifest[T]): Option[T] = {
if (validate) {
implicit val formats = SJSon.DefaultFormats
Option(SJSon.parse(res).extract[T])
Option(SJSon.parse(x(0).getContent).extract[T])
} else None
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package tech.mlsql.cluster.service

import tech.mlsql.cluster.service.BackendService.mapSResponseToObject

/**
* 2018-12-04 WilliamZhu([email protected])
*/
Expand Down Expand Up @@ -40,3 +42,24 @@ class TaskLessBackendStrategy(tags: String) extends BackendStrategy {
BackendService.find(backend)
}
}

class FreeCoreBackendStrategy(tags: String) extends BackendStrategy {
override def invoke(backends: Seq[BackendCache]): Option[BackendCache] = {

val tagSet = tags.split(",").toSet

var backends = BackendService.backends
if (!tags.isEmpty) {
backends = backends.filter(f => tagSet.intersect(f.meta.getTag.split(",").toSet).size > 0)
}
val backend = backends.seq.map { b =>
val res = b.instance.instanceResource
val resource = res.toBean[CSparkInstanceResource]().head
(resource.totalCores - resource.totalTasks, b)
}.sortBy(f => f._1).reverse.headOption.map(f => f._2.meta)

BackendService.find(backend)
}
}

case class CSparkInstanceResource(totalCores: Long, totalTasks: Long, totalUsedMemory: Long, totalMemory: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import net.csdn.common.path.Url
import net.csdn.modules.http.RestRequest.Method._
import net.csdn.modules.http.{ApplicationController, ViewType}
import net.csdn.modules.transport.HttpTransportService
import org.apache.spark.SparkInstanceService
import org.apache.spark.ps.cluster.Message
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.joda.time.format.ISODateTimeFormat
Expand Down Expand Up @@ -472,6 +473,13 @@ class RestController extends ApplicationController {
render("{}")
}

@At(path = Array("/instance/resource"), types = Array(GET, POST))
def instanceResource = {
val session = runtime.asInstanceOf[SparkRuntime].sparkSession
val resource = new SparkInstanceService(session).resources
render(toJsonString(resource))
}

//end -------------------------------------------


Expand Down

0 comments on commit 511b756

Please sign in to comment.