Skip to content

Commit

Permalink
[SPARK-29434][CORE] Improve the MapStatuses Serialization Performance
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time.

The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster.

Here is the benchmark result.

#### 20k map outputs, and each has 500 blocks
1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes
2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes
3. GZip: 0.092 ops/ms, 217,345 bytes

#### 20k map outputs, and each has 5 blocks
1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes
2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes
3. GZip: 0.21 ops/ms, 160,094 bytes

### Why are the changes needed?
Decrease the time for serializing the `MapStatuses` in large scale job.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes apache#26085 from dbtsai/mapStatus.

Lead-authored-by: DB Tsai <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dbtsai and dongjoon-hyun committed Oct 20, 2019
1 parent 0f65b49 commit f4d5aa4
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 55 deletions.
42 changes: 21 additions & 21 deletions core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,65 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 609 631 22 0.3 3043.8 1.0X
Deserialization 840 897 67 0.2 4201.2 0.7X
Serialization 205 213 13 1.0 1023.6 1.0X
Deserialization 908 939 27 0.2 4540.2 0.2X

Compressed Serialized MapStatus sizes: 393 bytes
Compressed Serialized Broadcast MapStatus sizes: 3 MB
Compressed Serialized MapStatus sizes: 400 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB


OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 591 599 8 0.3 2955.3 1.0X
Deserialization 878 913 31 0.2 4392.2 0.7X
Serialization 195 204 24 1.0 976.9 1.0X
Deserialization 913 940 33 0.2 4566.7 0.2X

Compressed Serialized MapStatus sizes: 3 MB
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 1776 1778 2 0.1 8880.5 1.0X
Deserialization 1086 1086 0 0.2 5427.9 1.6X
Serialization 616 619 3 0.3 3079.1 1.0X
Deserialization 936 954 22 0.2 4680.5 0.7X

Compressed Serialized MapStatus sizes: 411 bytes
Compressed Serialized Broadcast MapStatus sizes: 15 MB
Compressed Serialized MapStatus sizes: 418 bytes
Compressed Serialized Broadcast MapStatus sizes: 14 MB


OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 1725 1726 1 0.1 8624.9 1.0X
Deserialization 1093 1094 2 0.2 5463.6 1.6X
Serialization 586 588 3 0.3 2928.8 1.0X
Deserialization 929 933 4 0.2 4647.0 0.6X

Compressed Serialized MapStatus sizes: 15 MB
Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 12421 12522 142 0.0 62104.4 1.0X
Deserialization 3020 3043 32 0.1 15102.0 4.1X
Serialization 4740 4916 249 0.0 23698.5 1.0X
Deserialization 1578 1597 27 0.1 7890.6 3.0X

Compressed Serialized MapStatus sizes: 544 bytes
Compressed Serialized Broadcast MapStatus sizes: 131 MB
Compressed Serialized MapStatus sizes: 546 bytes
Compressed Serialized Broadcast MapStatus sizes: 123 MB


OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 11719 11737 26 0.0 58595.3 1.0X
Deserialization 3018 3051 46 0.1 15091.7 3.9X
Serialization 4492 4573 115 0.0 22458.3 1.0X
Deserialization 1533 1547 20 0.1 7664.8 2.9X

Compressed Serialized MapStatus sizes: 131 MB
Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


42 changes: 21 additions & 21 deletions core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,65 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 625 639 9 0.3 3127.2 1.0X
Deserialization 875 931 49 0.2 4376.2 0.7X
Serialization 236 245 18 0.8 1179.1 1.0X
Deserialization 842 885 37 0.2 4211.4 0.3X

Compressed Serialized MapStatus sizes: 393 bytes
Compressed Serialized Broadcast MapStatus sizes: 3 MB
Compressed Serialized MapStatus sizes: 400 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB


OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 604 640 71 0.3 3018.4 1.0X
Deserialization 889 903 17 0.2 4443.8 0.7X
Serialization 213 219 8 0.9 1065.1 1.0X
Deserialization 846 870 33 0.2 4228.6 0.3X

Compressed Serialized MapStatus sizes: 3 MB
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 1879 1880 2 0.1 9394.9 1.0X
Deserialization 1147 1150 5 0.2 5733.8 1.6X
Serialization 624 709 167 0.3 3121.1 1.0X
Deserialization 885 908 22 0.2 4427.0 0.7X

Compressed Serialized MapStatus sizes: 411 bytes
Compressed Serialized Broadcast MapStatus sizes: 15 MB
Compressed Serialized MapStatus sizes: 418 bytes
Compressed Serialized Broadcast MapStatus sizes: 14 MB


OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 1825 1826 1 0.1 9123.3 1.0X
Deserialization 1147 1281 189 0.2 5735.7 1.6X
Serialization 603 604 2 0.3 3014.9 1.0X
Deserialization 892 895 5 0.2 4458.7 0.7X

Compressed Serialized MapStatus sizes: 15 MB
Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 12327 12518 270 0.0 61634.3 1.0X
Deserialization 3120 3133 18 0.1 15600.8 4.0X
Serialization 4612 4945 471 0.0 23061.0 1.0X
Deserialization 1493 1495 2 0.1 7466.3 3.1X

Compressed Serialized MapStatus sizes: 544 bytes
Compressed Serialized Broadcast MapStatus sizes: 131 MB
Compressed Serialized MapStatus sizes: 546 bytes
Compressed Serialized Broadcast MapStatus sizes: 123 MB


OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Serialization 11928 11986 82 0.0 59642.2 1.0X
Deserialization 3137 3138 2 0.1 15683.3 3.8X
Serialization 4452 4595 202 0.0 22261.4 1.0X
Deserialization 1464 1477 18 0.1 7321.4 3.0X

Compressed Serialized MapStatus sizes: 131 MB
Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes


58 changes: 45 additions & 13 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark

import java.io._
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
Expand All @@ -29,6 +28,10 @@ import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.github.luben.zstd.ZstdInputStream
import com.github.luben.zstd.ZstdOutputStream
import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}

import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -885,13 +888,18 @@ private[spark] object MapOutputTracker extends Logging {
private val BROADCAST = 1

// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
// it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will
// generally be pretty compressible because many map outputs will be on the same hostname.
def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
val out = new ByteArrayOutputStream
out.write(DIRECT)
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
// This implementation doesn't reallocate the whole memory block but allocates
// additional buffers. This way no buffers need to be garbage collected and
// the contents don't have to be copied to the new buffer.
val out = new ApacheByteArrayOutputStream()
val compressedOut = new ApacheByteArrayOutputStream()

val objOut = new ObjectOutputStream(out)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
Expand All @@ -900,18 +908,42 @@ private[spark] object MapOutputTracker extends Logging {
} {
objOut.close()
}
val arr = out.toByteArray

val arr: Array[Byte] = {
val zos = new ZstdOutputStream(compressedOut)
Utils.tryWithSafeFinally {
compressedOut.write(DIRECT)
// `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
// without copying to avoid unnecessary allocation and copy of byte[].
out.writeTo(zos)
} {
zos.close()
}
compressedOut.toByteArray
}
if (arr.length >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
val bcast = broadcastManager.newBroadcast(arr, isLocal)
// toByteArray creates copy, so we can reuse out
out.reset()
out.write(BROADCAST)
val oos = new ObjectOutputStream(new GZIPOutputStream(out))
oos.writeObject(bcast)
oos.close()
val outArr = out.toByteArray
val oos = new ObjectOutputStream(out)
Utils.tryWithSafeFinally {
oos.writeObject(bcast)
} {
oos.close()
}
val outArr = {
compressedOut.reset()
val zos = new ZstdOutputStream(compressedOut)
Utils.tryWithSafeFinally {
compressedOut.write(BROADCAST)
out.writeTo(zos)
} {
zos.close()
}
compressedOut.toByteArray
}
logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
(outArr, bcast)
} else {
Expand All @@ -924,7 +956,7 @@ private[spark] object MapOutputTracker extends Logging {
assert (bytes.length > 0)

def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
val objIn = new ObjectInputStream(new GZIPInputStream(
val objIn = new ObjectInputStream(new ZstdInputStream(
new ByteArrayInputStream(arr, off, len)))
Utils.tryWithSafeFinally {
objIn.readObject()
Expand Down

0 comments on commit f4d5aa4

Please sign in to comment.