Skip to content

Commit

Permalink
[SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concur…
Browse files Browse the repository at this point in the history
…rent issue.

# What issue does this PR address ?
Jira:https://issues.apache.org/jira/browse/SPARK-21223
fix the Thread-safety issue in FsHistoryProvider
Currently, Spark HistoryServer use a HashMap named fileToAppInfo in class FsHistoryProvider to store the map of eventlog path and attemptInfo.
When use ThreadPool to Replay the log files in the list and merge the list of old applications with new ones, multi thread may update fileToAppInfo at the same time, which may cause Thread-safety issues, such as  falling into an infinite loop because of calling resize func of the hashtable.

Author: 曾林西 <[email protected]>

Closes apache#18430 from zenglinxi0615/master.
  • Loading branch information
曾林西 authored and srowen committed Jun 30, 2017
1 parent 528c928 commit 1fe08d6
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history

import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable
Expand Down Expand Up @@ -122,7 +122,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()
val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()

// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
Expand Down Expand Up @@ -321,7 +321,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
val fileInfo = fileToAppInfo.get(entry.getPath())
val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
Expand Down Expand Up @@ -475,7 +476,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
fileStatus.getLen(),
appListener.appSparkVersion.getOrElse("")
)
fileToAppInfo(logPath) = attemptInfo
fileToAppInfo.put(logPath, attemptInfo)
logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
Some(attemptInfo)
} else {
Expand Down

0 comments on commit 1fe08d6

Please sign in to comment.