diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 747aae09272f9..7a6cfb7b23b94 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -2,21 +2,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 609 631 22 0.3 3043.8 1.0X -Deserialization 840 897 67 0.2 4201.2 0.7X +Serialization 205 213 13 1.0 1023.6 1.0X +Deserialization 908 939 27 0.2 4540.2 0.2X -Compressed Serialized MapStatus sizes: 393 bytes -Compressed Serialized Broadcast MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized Broadcast MapStatus sizes: 2 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 591 599 8 0.3 2955.3 1.0X -Deserialization 878 913 31 0.2 4392.2 0.7X +Serialization 195 204 24 1.0 976.9 1.0X +Deserialization 913 940 33 0.2 4566.7 0.2X -Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1776 1778 2 0.1 8880.5 1.0X -Deserialization 1086 1086 0 0.2 5427.9 1.6X +Serialization 616 619 3 0.3 3079.1 1.0X +Deserialization 936 954 22 0.2 4680.5 0.7X -Compressed Serialized MapStatus sizes: 411 bytes -Compressed Serialized Broadcast MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 418 bytes +Compressed Serialized Broadcast MapStatus sizes: 14 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1725 1726 1 0.1 8624.9 1.0X -Deserialization 1093 1094 2 0.2 5463.6 1.6X +Serialization 586 588 3 0.3 2928.8 1.0X +Deserialization 929 933 4 0.2 4647.0 0.6X -Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 14 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 12421 12522 142 0.0 62104.4 1.0X -Deserialization 3020 3043 32 0.1 15102.0 4.1X +Serialization 4740 4916 249 0.0 23698.5 1.0X +Deserialization 1578 1597 27 0.1 7890.6 3.0X -Compressed Serialized MapStatus sizes: 544 bytes -Compressed Serialized Broadcast MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 546 bytes +Compressed Serialized Broadcast MapStatus sizes: 123 MB OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 11719 11737 26 0.0 58595.3 1.0X -Deserialization 3018 3051 46 0.1 15091.7 3.9X +Serialization 4492 4573 115 0.0 22458.3 1.0X +Deserialization 1533 1547 20 0.1 7664.8 2.9X -Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 123 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt index 1f479a49d5860..0c649694f6b6e 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt @@ -2,21 +2,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 625 639 9 0.3 3127.2 1.0X -Deserialization 875 931 49 0.2 4376.2 0.7X +Serialization 236 245 18 0.8 1179.1 1.0X +Deserialization 842 885 37 0.2 4211.4 0.3X -Compressed Serialized MapStatus sizes: 393 bytes -Compressed Serialized Broadcast MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 400 bytes +Compressed Serialized Broadcast MapStatus sizes: 2 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 604 640 71 0.3 3018.4 1.0X -Deserialization 889 903 17 0.2 4443.8 0.7X +Serialization 213 219 8 0.9 1065.1 1.0X +Deserialization 846 870 33 0.2 4228.6 0.3X -Compressed Serialized MapStatus sizes: 3 MB +Compressed Serialized MapStatus sizes: 2 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -24,21 +24,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1879 1880 2 0.1 9394.9 1.0X -Deserialization 1147 1150 5 0.2 5733.8 1.6X +Serialization 624 709 167 0.3 3121.1 1.0X +Deserialization 885 908 22 0.2 4427.0 0.7X -Compressed Serialized MapStatus sizes: 411 bytes -Compressed Serialized Broadcast MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 418 bytes +Compressed Serialized Broadcast MapStatus sizes: 14 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 1825 1826 1 0.1 9123.3 1.0X -Deserialization 1147 1281 189 0.2 5735.7 1.6X +Serialization 603 604 2 0.3 3014.9 1.0X +Deserialization 892 895 5 0.2 4458.7 0.7X -Compressed Serialized MapStatus sizes: 15 MB +Compressed Serialized MapStatus sizes: 14 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes @@ -46,21 +46,21 @@ OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15. Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 12327 12518 270 0.0 61634.3 1.0X -Deserialization 3120 3133 18 0.1 15600.8 4.0X +Serialization 4612 4945 471 0.0 23061.0 1.0X +Deserialization 1493 1495 2 0.1 7466.3 3.1X -Compressed Serialized MapStatus sizes: 544 bytes -Compressed Serialized Broadcast MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 546 bytes +Compressed Serialized Broadcast MapStatus sizes: 123 MB OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz 200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Serialization 11928 11986 82 0.0 59642.2 1.0X -Deserialization 3137 3138 2 0.1 15683.3 3.8X +Serialization 4452 4595 202 0.0 22261.4 1.0X +Deserialization 1464 1477 18 0.1 7321.4 3.0X -Compressed Serialized MapStatus sizes: 131 MB +Compressed Serialized MapStatus sizes: 123 MB Compressed Serialized Broadcast MapStatus sizes: 0 bytes diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index c181fac8b4d8e..6f4a6239a09ed 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -17,10 +17,9 @@ package org.apache.spark -import java.io._ +import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock -import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, ListBuffer, Map} @@ -29,6 +28,10 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.github.luben.zstd.ZstdInputStream +import com.github.luben.zstd.ZstdOutputStream +import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream} + import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -885,13 +888,18 @@ private[spark] object MapOutputTracker extends Logging { private val BROADCAST = 1 // Serialize an array of map output locations into an efficient byte format so that we can send - // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will + // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will // generally be pretty compressible because many map outputs will be on the same hostname. def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager, isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = { - val out = new ByteArrayOutputStream - out.write(DIRECT) - val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) + // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one + // This implementation doesn't reallocate the whole memory block but allocates + // additional buffers. This way no buffers need to be garbage collected and + // the contents don't have to be copied to the new buffer. + val out = new ApacheByteArrayOutputStream() + val compressedOut = new ApacheByteArrayOutputStream() + + val objOut = new ObjectOutputStream(out) Utils.tryWithSafeFinally { // Since statuses can be modified in parallel, sync on it statuses.synchronized { @@ -900,18 +908,42 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - val arr = out.toByteArray + + val arr: Array[Byte] = { + val zos = new ZstdOutputStream(compressedOut) + Utils.tryWithSafeFinally { + compressedOut.write(DIRECT) + // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos` + // without copying to avoid unnecessary allocation and copy of byte[]. + out.writeTo(zos) + } { + zos.close() + } + compressedOut.toByteArray + } if (arr.length >= minBroadcastSize) { // Use broadcast instead. // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! val bcast = broadcastManager.newBroadcast(arr, isLocal) // toByteArray creates copy, so we can reuse out out.reset() - out.write(BROADCAST) - val oos = new ObjectOutputStream(new GZIPOutputStream(out)) - oos.writeObject(bcast) - oos.close() - val outArr = out.toByteArray + val oos = new ObjectOutputStream(out) + Utils.tryWithSafeFinally { + oos.writeObject(bcast) + } { + oos.close() + } + val outArr = { + compressedOut.reset() + val zos = new ZstdOutputStream(compressedOut) + Utils.tryWithSafeFinally { + compressedOut.write(BROADCAST) + out.writeTo(zos) + } { + zos.close() + } + compressedOut.toByteArray + } logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) (outArr, bcast) } else { @@ -924,7 +956,7 @@ private[spark] object MapOutputTracker extends Logging { assert (bytes.length > 0) def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { - val objIn = new ObjectInputStream(new GZIPInputStream( + val objIn = new ObjectInputStream(new ZstdInputStream( new ByteArrayInputStream(arr, off, len))) Utils.tryWithSafeFinally { objIn.readObject()