Skip to content

Commit

Permalink
[SPARK-26660] Add warning logs when broadcasting large task binary
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently, some ML library may generate large ml model, which may be referenced in the task closure, so driver will broadcasting large task binary, and executor may not able to deserialize it and result in OOM failures(for instance, executor's memory is not enough). This problem not only affects apps using ml library, some user specified closure or function which refers large data may also have this problem.

In order to facilitate the debuging of memory problem caused by large taskBinary broadcast, we can add same warning logs for it.

This PR will add some warning logs on the driver side when broadcasting a large task binary, and it also included some minor log changes in the reading of broadcast.

## How was this patch tested?
NA-Just log changes.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#23580 from liupc/Add-warning-logs-for-large-taskBinary-size.

Authored-by: Liupengcheng <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
Liupengcheng authored and srowen committed Jan 23, 2019
1 parent d008e23 commit 0446363
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
case None =>
logInfo("Started reading broadcast variable " + id)
val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +
s"(estimated total size $estimatedTotalSize)")
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,10 @@ private[spark] class DAGScheduler(
partitions = stage.rdd.partitions
}

if (taskBinaryBytes.length * 1000 > TaskSetManager.TASK_SIZE_TO_WARN_KB) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
Expand Down

0 comments on commit 0446363

Please sign in to comment.