Skip to content

Commit

Permalink
[LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when ti…
Browse files Browse the repository at this point in the history
…meout

## What changes were proposed in this pull request?
1.`var isRemoved = false` should be in `while(iter.hasNext),` otherwise if there are two apps, the first app will be killApplication and the second app will timeout in this loop, and after removing the first app,` isRemoved = true`, and the second app cannot pass the` if(!isRemoved)` and only will be deleted in the next loop.

2.`entry.getValue - now` is negative, and never greater than `sessionLeakageCheckTimeout`.

![image](https://user-images.githubusercontent.com/51938049/69202431-99a81080-0b7c-11ea-8084-9801af5a75bd.png)

## How was this patch tested?

Existed IT and UT.

Author: runzhiwang <[email protected]>

Closes apache#259 from runzhiwang/leakapp.
  • Loading branch information
runzhiwang authored and jerryshao committed Nov 25, 2019
1 parent 8f1e898 commit cccba94
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
27 changes: 20 additions & 7 deletions server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ import org.apache.livy.{LivyConf, Logging, Utils}

object SparkYarnApp extends Logging {

def init(livyConf: LivyConf): Unit = {
def init(livyConf: LivyConf, client: Option[YarnClient] = None): Unit = {
mockYarnClient = client
sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT)
leakedAppsGCThread.setDaemon(true)
leakedAppsGCThread.setName("LeakedAppsGCThread")
leakedAppsGCThread.start()
}

private var mockYarnClient: Option[YarnClient] = None

// YarnClient is thread safe. Create once, share it across threads.
lazy val yarnClient = {
val c = YarnClient.createYarnClient()
Expand All @@ -59,34 +62,44 @@ object SparkYarnApp extends Logging {
private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds

private val appType = Set("SPARK").asJava
private[utils] val appType = Set("SPARK").asJava

private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()

private var sessionLeakageCheckTimeout: Long = _

private var sessionLeakageCheckInterval: Long = _

private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
val client = {
mockYarnClient match {
case Some(client) => client
case None => yarnClient
}
}

while (true) {
if (!leakedAppTags.isEmpty) {
// kill the app if found it and remove it if exceeding a threshold
val iter = leakedAppTags.entrySet().iterator()
var isRemoved = false
val now = System.currentTimeMillis()
val apps = yarnClient.getApplications(appType).asScala
val apps = client.getApplications(appType).asScala

while(iter.hasNext) {
var isRemoved = false
val entry = iter.next()

apps.find(_.getApplicationTags.contains(entry.getKey))
.foreach({ e =>
info(s"Kill leaked app ${e.getApplicationId}")
yarnClient.killApplication(e.getApplicationId)
client.killApplication(e.getApplicationId)
iter.remove()
isRemoved = true
})

if (!isRemoved) {
if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
if ((now - entry.getValue) > sessionLeakageCheckTimeout) {
iter.remove()
info(s"Remove leaked yarn app tag ${entry.getKey}")
}
Expand Down
21 changes: 21 additions & 0 deletions server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.livy.utils

import java.util.ArrayList
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

Expand Down Expand Up @@ -461,5 +462,25 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
}
}
}

it("should delete leak app when timeout") {
Clock.withSleepMethod(mockSleep) {
livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")

val client = mock[YarnClient]
when(client.getApplications(SparkYarnApp.appType)).
thenReturn(new ArrayList[ApplicationReport]())

SparkYarnApp.init(livyConf, Some(client))

SparkYarnApp.leakedAppTags.clear()
SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis())

Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) {
assert(SparkYarnApp.leakedAppTags.size() == 0)
}
}
}
}
}

0 comments on commit cccba94

Please sign in to comment.