Skip to content

Commit

Permalink
update MLSQLResource
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Mar 8, 2019
1 parent fadefb1 commit 56b2075
Showing 1 changed file with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.apache.spark

import org.apache.spark.sql.{MLSQLUtils, SparkSession}
import org.apache.spark.status.api.v1
import tech.mlsql.render.protocal.{MLSQLResourceRender, MLSQLScriptJob, MLSQLScriptJobGroup}
import tech.mlsql.render.protocal.{MLSQLResourceRender, MLSQLScriptJob, MLSQLScriptJobGroup, MLSQLShufflePerfRender}

import scala.collection.mutable.{Buffer, ListBuffer}

Expand All @@ -15,18 +15,47 @@ class MLSQLResource(spark: SparkSession, owner: String, getGroupId: String => St
def resourceSummary(jobGroupId: String) = {
val store = MLSQLUtils.getAppStatusStore(spark)
val executorList = store.executorList(true)
val totalExecutorList = store.executorList(false)
val activeJobs = store.jobsList(null).filter(f => f.status == JobExecutionStatus.RUNNING)

val finalJobGroupId = getGroupId(jobGroupId)


def getNumActiveTaskByJob(job: v1.JobData) = {
val (activeStages, completedStages, failedStages) = fetchStageByJob(job)
activeStages.map(f => f.numActiveTasks).sum
}

def getDiskBytesSpilled(job: v1.JobData) = {
val (activeStages, completedStages, failedStages) = fetchStageByJob(job)
activeStages.map(f => f.diskBytesSpilled).sum
}

def getInputRecords(job: v1.JobData) = {
val (activeStages, completedStages, failedStages) = fetchStageByJob(job)
activeStages.map(f => f.inputRecords).sum
}

def getMemoryBytesSpilled(job: v1.JobData) = {
val (activeStages, completedStages, failedStages) = fetchStageByJob(job)
activeStages.map(f => f.memoryBytesSpilled).sum
}


val currentJobGroupActiveTasks = if (jobGroupId == null) activeJobs.map(getNumActiveTaskByJob).sum
else activeJobs.filter(f => f.jobGroup.get == finalJobGroupId).map(getNumActiveTaskByJob).sum

val currentDiskBytesSpilled = if (jobGroupId == null) activeJobs.map(getDiskBytesSpilled).sum
else activeJobs.filter(f => f.jobGroup.get == finalJobGroupId).map(getDiskBytesSpilled).sum

val currentInputRecords = if (jobGroupId == null) activeJobs.map(getInputRecords).sum
else activeJobs.filter(f => f.jobGroup.get == finalJobGroupId).map(getInputRecords).sum

val currentMemoryBytesSpilled = if (jobGroupId == null) activeJobs.map(getMemoryBytesSpilled).sum
else activeJobs.filter(f => f.jobGroup.get == finalJobGroupId).map(getMemoryBytesSpilled).sum

val shuffle = MLSQLShufflePerfRender(memoryBytesSpilled = currentMemoryBytesSpilled, diskBytesSpilled = currentDiskBytesSpilled, inputRecords = currentInputRecords)

MLSQLResourceRender(
currentJobGroupActiveTasks = currentJobGroupActiveTasks,
activeTasks = executorList.map(_.activeTasks).sum,
Expand All @@ -36,7 +65,11 @@ class MLSQLResource(spark: SparkSession, owner: String, getGroupId: String => St
taskTime = executorList.map(_.totalDuration).sum,
gcTime = executorList.map(_.totalGCTime).sum,
activeExecutorNum = executorList.size,
totalCores = executorList.map(_.totalCores).sum
totalExecutorNum = totalExecutorList.size,
totalCores = executorList.map(_.totalCores).sum,
usedMemory = executorList.map(_.memoryUsed).sum,
totalMemory = totalExecutorList.map(f => f.maxMemory).sum,
shuffleData = shuffle
)


Expand Down

0 comments on commit 56b2075

Please sign in to comment.