Skip to content

Commit

Permalink
[CELEBORN-1501] Introduce application dimension resource consumption …
Browse files Browse the repository at this point in the history
…metrics of Worker

### What changes were proposed in this pull request?

Introduce application dimension resource consumption metrics of Worker for `ResourceConsumptionSource`.

### Why are the changes needed?

`ResourceConsumption` namespace metrics are generated for each user and they are identified using a metric tag at present. It's recommended to introduce application dimension resource consumption metrics that expose application dimension resource consumption of Worker. By monitoring resource consumption in the application dimension, you can obtain the actual situation of application resource consumption.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

```
curl http://celeborn-worker:9096/metrics|grep applicationId|grep disk|head -20
metrics_diskFileCount_Value{applicationId="application_1720756171504_197094_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 42 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_197094_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 27157332949 1721132143020
metrics_diskFileCount_Value{applicationId="application_1718714878734_1549139_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 47 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1549139_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 483045590821721132143020
metrics_diskFileCount_Value{applicationId="application_1688369676084_19713351_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 20 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1688369676084_19713351_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 13112170199 1721132143020
metrics_diskFileCount_Value{applicationId="application_1718714878734_1552645_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 45 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1552645_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 353350343061721132143020
metrics_diskFileCount_Value{applicationId="application_1718714878734_1552665_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 59 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1552665_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 476373757311721132143020
metrics_diskFileCount_Value{applicationId="application_1720756171504_199529_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 59 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_199529_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 54106810966 1721132143020
metrics_diskFileCount_Value{applicationId="application_1720756171504_199536_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 19 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_199536_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 9215818606 1721132143020
metrics_diskFileCount_Value{applicationId="application_1650016801129_34416161_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 26 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1650016801129_34416161_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 23650636804 1721132143020
metrics_diskFileCount_Value{applicationId="application_1716712852097_2884119_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 12 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1716712852097_2884119_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 650314937 1721132143020
metrics_diskFileCount_Value{applicationId="application_1718714878734_1563526_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 16 1721132143020
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1563526_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"} 1555862722 1721132143020
```
<img width="1351" alt="image" src="https://github.com/user-attachments/assets/3e007e80-7329-467b-bf74-cfe502b62ae5">
<img width="1351" alt="image" src="https://github.com/user-attachments/assets/d93ee335-c078-46b8-b682-3b1a04f8a614">
<img width="1351" alt="image" src="https://github.com/user-attachments/assets/62790378-38aa-480f-b959-6fdbad617808">
<img width="1352" alt="image" src="https://github.com/user-attachments/assets/b6717316-0b44-4a7b-a55b-4ffa844ded66">

Closes apache#2630 from SteNicholas/CELEBORN-1292.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
SteNicholas authored and FMX committed Sep 6, 2024
1 parent cc26131 commit 29c0f7e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
def metricsWorkerAppTopResourceConsumptionCount: Int =
get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
Expand Down Expand Up @@ -4942,6 +4944,16 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10min")

val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] =
buildConf("celeborn.metrics.worker.app.topResourceConsumption.count")
.categories("metrics")
.doc("Size for top items about top resource consumption applications list of worker. " +
"The top resource consumption is determined by sum of diskBytesWritten and hdfsBytesWritten. " +
"The top resource consumption count prevents the total number of metrics from exceeding the metrics capacity.")
.version("0.6.0")
.intConf
.createWithDefault(50)

val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] =
buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
.categories("metrics")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ license: |
| celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | |
| celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | |
| celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding window size of timer metric. | 0.2.0 | |
| celeborn.metrics.worker.app.topResourceConsumption.count | 50 | false | Size for top items about top resource consumption applications list of worker. The top resource consumption is determined by sum of diskBytesWritten and hdfsBytesWritten. The top resource consumption count prevents the total number of metrics from exceeding the metrics capacity. | 0.6.0 | |
| celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | false | Force append worker pause spent time even if worker still in pause serving state.Help user can find worker pause spent time increase, when worker always been pause state. | | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ private[celeborn] class Worker(
metricsSystem.registerSource(new JVMCPUSource(conf, MetricsSystem.ROLE_WORKER))
metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_WORKER))

private val topResourceConsumptionCount = conf.metricsWorkerAppTopResourceConsumptionCount
private val topApplicationUserIdentifiers =
JavaUtils.newConcurrentHashMap[String, UserIdentifier]()

val workerStatusManager = new WorkerStatusManager(conf)
private val authEnabled = conf.authEnabled
private val secretRegistry = new WorkerSecretRegistryImpl(conf.workerApplicationRegistryCacheSize)
Expand Down Expand Up @@ -661,55 +665,75 @@ private[celeborn] class Worker(
val resourceConsumptionSnapshot = storageManager.userResourceConsumptionSnapshot()
val userResourceConsumptions =
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
resourceConsumptionSnapshot.foreach { case (userIdentifier, userResourceConsumption) =>
resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
gaugeResourceConsumption(userIdentifier)
}
handleTopResourceConsumption(userResourceConsumptions)
userResourceConsumptions
}

private def handleTopResourceConsumption(userResourceConsumptions: util.Map[
UserIdentifier,
ResourceConsumption]): Unit = {
// Remove application top resource consumption gauges to refresh top resource consumption metrics.
removeAppResourceConsumption(topApplicationUserIdentifiers.keySet().asScala)
// Top resource consumption is determined by diskBytesWritten+hdfsBytesWritten.
userResourceConsumptions.asScala.filter(userResourceConsumption =>
CollectionUtils.isNotEmpty(userResourceConsumption._2.subResourceConsumptions))
.flatMap(userResourceConsumption =>
userResourceConsumption._2.subResourceConsumptions.asScala.map(subResourceConsumption =>
(subResourceConsumption._1, (userResourceConsumption._1, subResourceConsumption._2))))
.toSeq
.sortBy(resourceConsumption =>
resourceConsumption._2._2.diskBytesWritten + resourceConsumption._2._2.hdfsBytesWritten)
.reverse
.take(topResourceConsumptionCount).foreach { topResourceConsumption =>
val applicationId = topResourceConsumption._1
val userIdentifier = topResourceConsumption._2._1
topApplicationUserIdentifiers.put(applicationId, userIdentifier)
gaugeResourceConsumption(userIdentifier, applicationId, topResourceConsumption._2._2)
}
}

private def gaugeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): Unit = {
applicationId: String = null,
resourceConsumption: ResourceConsumption = null): Unit = {
var resourceConsumptionLabel = userIdentifier.toMap
if (applicationId != null)
resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel -> applicationId)
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskFileCount
computeResourceConsumption(userIdentifier, resourceConsumption).diskFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten
computeResourceConsumption(userIdentifier, resourceConsumption).diskBytesWritten
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsBytesWritten
}
}

private def computeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): ResourceConsumption = {
var resourceConsumption =
resourceConsumption: ResourceConsumption = null): ResourceConsumption = {
if (resourceConsumption == null) {
workerInfo.userResourceConsumption.getOrDefault(
userIdentifier,
ResourceConsumption(0, 0, 0, 0))
if (applicationId != null) {
val subResourceConsumptions = resourceConsumption.subResourceConsumptions
if (CollectionUtils.isNotEmpty(subResourceConsumptions)) {
resourceConsumption =
subResourceConsumptions.getOrDefault(applicationId, ResourceConsumption(0, 0, 0, 0))
}
} else {
resourceConsumption
}
resourceConsumption
}

@VisibleForTesting
Expand All @@ -734,6 +758,7 @@ private[celeborn] class Worker(
fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
threadPool.execute(new Runnable {
override def run(): Unit = {
removeAppResourceConsumption(expiredApplicationIds.asScala)
removeAppActiveConnection(expiredApplicationIds)
workerSource.sample(
WorkerSource.CLEAN_EXPIRED_SHUFFLE_KEYS_TIME,
Expand All @@ -744,6 +769,25 @@ private[celeborn] class Worker(
})
}

private def removeAppResourceConsumption(applicationIds: Iterable[String]): Unit = {
applicationIds.foreach { applicationId => removeAppResourceConsumption(applicationId) }
}

private def removeAppResourceConsumption(applicationId: String): Unit = {
val userIdentifier = topApplicationUserIdentifiers.remove(applicationId)
if (userIdentifier != null) {
removeAppResourceConsumption(
userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId))
}
}

private def removeAppResourceConsumption(resourceConsumptionLabel: Map[String, String]): Unit = {
workerSource.removeGauge(ResourceConsumptionSource.DISK_FILE_COUNT, resourceConsumptionLabel)
workerSource.removeGauge(ResourceConsumptionSource.DISK_BYTES_WRITTEN, resourceConsumptionLabel)
workerSource.removeGauge(ResourceConsumptionSource.HDFS_FILE_COUNT, resourceConsumptionLabel)
workerSource.removeGauge(ResourceConsumptionSource.HDFS_BYTES_WRITTEN, resourceConsumptionLabel)
}

private def removeAppActiveConnection(applicationIds: JHashSet[String]): Unit = {
workerSource.removeAppActiveConnection(applicationIds)
}
Expand Down

0 comments on commit 29c0f7e

Please sign in to comment.