Skip to content

Commit

Permalink
[SPARK-43043][CORE] Improve the performance of MapOutputTracker.updat…
Browse files Browse the repository at this point in the history
…eMapOutput

### What changes were proposed in this pull request?

The PR changes the implementation of MapOutputTracker.updateMapOutput() to search for the MapStatus under the help of a mapping from mapId to mapIndex, previously it was performing a linear search, which would become performance bottleneck if a large proportion of all blocks in the map are migrated.

### Why are the changes needed?

To avoid performance bottleneck when block decommission is enabled and a lot of blocks are migrated within a short time window.

### Does this PR introduce _any_ user-facing change?

No, it's pure performance improvement.

### How was this patch tested?

Manually test.

Closes apache#40690 from jiangxb1987/SPARK-43043.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Jiang Xingbo <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>
  • Loading branch information
jiangxb1987 and jiangxb1987 committed May 16, 2023
1 parent 530dea6 commit 66a2eb8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus}
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId}
import org.apache.spark.util._
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

/**
Expand Down Expand Up @@ -147,6 +148,12 @@ private class ShuffleStatus(

private[this] var shufflePushMergerLocations: Seq[BlockManagerId] = Seq.empty

/**
* Mapping from a mapId to the mapIndex, this is required to reduce the searching overhead within
* the function updateMapOutput(mapId, bmAddress).
*/
private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]()

/**
* Register a map output. If there is already a registered location for the map output then it
* will be replaced by the new location.
Expand All @@ -157,22 +164,31 @@ private class ShuffleStatus(
invalidateSerializedMapOutputStatusCache()
}
mapStatuses(mapIndex) = status
mapIdToMapIndex(status.mapId) = mapIndex
}

/**
* Get the map output that corresponding to a given mapId.
*/
def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock {
mapIdToMapIndex.get(mapId).map(mapStatuses(_))
}

/**
* Update the map output location (e.g. during migration).
*/
def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock {
try {
val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
val mapIndex = mapIdToMapIndex.get(mapId)
val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
mapStatusOpt match {
case Some(mapStatus) =>
logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
mapStatus.updateLocation(bmAddress)
invalidateSerializedMapOutputStatusCache()
case None =>
val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId)
if (index >= 0 && mapStatuses(index) == null) {
if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) {
val index = mapIndex.get
val mapStatus = mapStatusesDeleted(index)
mapStatus.updateLocation(bmAddress)
mapStatuses(index) = mapStatus
Expand Down Expand Up @@ -1137,9 +1153,7 @@ private[spark] class MapOutputTrackerMaster(
*/
def getMapOutputLocation(shuffleId: Int, mapId: Long): Option[BlockManagerId] = {
shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
shuffleStatus.withMapStatuses { mapStatues =>
mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
}
shuffleStatus.getMapStatus(mapId).map(_.location)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
}
}

/** Get the value for a given key, return None if the key doesn't exist */
def get(k: K): Option[V] = {
if (k == null) {
if (haveNullValue) {
Some(nullValue)
} else {
None
}
} else {
val pos = _keySet.getPos(k)
if (pos < 0) {
None
} else {
Some(_values(pos))
}
}
}

/** Set the value for a key */
def update(k: K, v: V): Unit = {
if (k == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,22 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
assert(map2("b") === 0.0)
assert(map2("c") === null)
}

test("get") {
val map = new OpenHashMap[String, String]()

// Get with normal/null keys.
map("1") = "1"
assert(map.get("1") === Some("1"))
assert(map.get("2") === None)
assert(map.get(null) === None)
map(null) = "hello"
assert(map.get(null) === Some("hello"))

// Get with null values.
map("1") = null
assert(map.get("1") === Some(null))
map(null) = null
assert(map.get(null) === Some(null))
}
}

0 comments on commit 66a2eb8

Please sign in to comment.