From 6f112f7b1a50a2b8a59952c69f67dd5f80ab6633 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sun, 3 Dec 2023 22:08:20 -0800 Subject: [PATCH] [SPARK-46182][CORE] Track `lastTaskFinishTime` using the exact task finished event ### What changes were proposed in this pull request? We found a race condition between lastTaskRunningTime and lastShuffleMigrationTime that could lead to a decommissioned executor exit before all the shuffle blocks have been discovered. The issue could lead to immediate task retry right after an executor exit, thus longer query execution time. To fix the issue, we choose to update the lastTaskRunningTime only when a task updates its status to finished through the StatusUpdate event. This is better than the current approach (which use a thread to check for number of running tasks every second), because in this way we clearly know whether the shuffle block refresh happened after all tasks finished running or not, thus resolved the race condition mentioned above. ### Why are the changes needed? To fix a race condition that could lead to shuffle data lost, thus longer query execution time. ### How was this patch tested? This is a very subtle race condition that is hard to write a unit test using current unit test framework. And we are confident the change is low risk. Thus only verify by passing all the existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44090 from jiangxb1987/SPARK-46182. Authored-by: Xingbo Jiang Signed-off-by: Dongjoon Hyun --- .../executor/CoarseGrainedExecutorBackend.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f1a9aa353e76d..4bf4929c1339f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.net.URL import java.nio.ByteBuffer import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -77,6 +77,10 @@ private[spark] class CoarseGrainedExecutorBackend( private var decommissioned = false + // Track the last time in ns that at least one task is running. If no task is running and all + // shuffle/RDD data migration are done, the decommissioned executor should exit. + private var lastTaskFinishTime = new AtomicLong(System.nanoTime()) + override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) @@ -273,6 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend( val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) + lastTaskFinishTime.set(System.nanoTime()) } driver match { case Some(driverRef) => driverRef.send(msg) @@ -345,7 +350,6 @@ private[spark] class CoarseGrainedExecutorBackend( val shutdownThread = new Thread("wait-for-blocks-to-migrate") { override def run(): Unit = { - var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s // This config is internal and only used by unit tests to force an executor // to hang around for longer when decommissioned. @@ -362,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend( val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() // We can only trust allBlocksMigrated boolean value if there were no tasks running // since the start of computing it. - if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) { logInfo("No running tasks, all blocks migrated, stopping.") exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } else { @@ -374,12 +378,6 @@ private[spark] class CoarseGrainedExecutorBackend( } } else { logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks") - // If there is a running task it could store blocks, so make sure we wait for a - // migration loop to complete after the last task is done. - // Note: this is only advanced if there is a running task, if there - // is no running task but the blocks are not done migrating this does not - // move forward. - lastTaskRunningTime = System.nanoTime() } Thread.sleep(sleep_time) }