From de5e531d337075fd849437e88846873bca8685e6 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 16 Nov 2015 11:21:17 -0800 Subject: [PATCH] [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak Yavuz Closes #9695 from brkyvz/enable-batch-wal. --- .../streaming/util/WriteAheadLogUtils.scala | 2 +- .../streaming/JavaWriteAheadLogSuite.java | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 9 +++++-- .../streaming/util/WriteAheadLogSuite.scala | 24 ++++++++++++++++++- .../util/WriteAheadLogUtilsSuite.scala | 19 ++++++++++++--- 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369fc92c0..7f9e2c9734970 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { - isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) + isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a496b4e5..09b5f8ed03279 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public void close() { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); + conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17abb7947c..081f5a1c93e6e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq - }.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) + }.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { + Array(byteBuffer) + } + validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array())) }.toList } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 4273fd7dda8be..7f80d6ecdbbb5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -20,7 +20,7 @@ import java.io._ import java.nio.ByteBuffer import java.util.{Iterator => JIterator} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor} +import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests( } assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment") } + + test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") { + // write some data + val writtenData = (1 to 10).map { i => + val data = generateRandomData() + val file = testDir + s"/log-$i-$i" + writeDataManually(data, file, allowBatching) + data + }.flatten + + val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching) + // create iterator but don't materialize it + val readData = wal.readAll().asScala.map(byteBufferToString) + wal.close() + if (closeFileAfterWrite) { + // the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able + // to materialize the iterator with parallel recovery + intercept[RejectedExecutionException](readData.toArray) + } else { + assert(readData.toSeq === writtenData) + } + } } class FileBasedWriteAheadLogSuite diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala index 9152728191ea1..bfc5b0cf60fb1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala @@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { test("log selection and creation") { val emptyConf = new SparkConf() // no log configuration - assertDriverLogClass[FileBasedWriteAheadLog](emptyConf) + assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf) // Verify setting driver WAL class val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[MockWriteAheadLog0](driverWALConf) + assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf) // Verify setting receiver WAL class val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf) + assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) // Verify setting receiver WAL class with 1-arg constructor @@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) } + + test("batching is enabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) + // batching is not valid for receiver WALs + assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false)) + } + + test("closeFileAfterWrite is disabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true)) + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false)) + } } object WriteAheadLogUtilsSuite {