Skip to content

Commit

Permalink
[SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by d…
Browse files Browse the repository at this point in the history
…efault

Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period.

I've also added some tests to make sure the default configurations are correct regarding recent additions:
 - batching on by default
 - closeFileAfterWrite off by default
 - parallelRecovery off by default

Author: Burak Yavuz <[email protected]>

Closes #9695 from brkyvz/enable-batch-wal.
  • Loading branch information
brkyvz authored and tdas committed Nov 16, 2015
1 parent b0c3fd3 commit de5e531
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
}

def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false)
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void close() {
public void testCustomWAL() {
SparkConf conf = new SparkConf();
conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);

String data1 = "data1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
}.flatMap { byteBuffer =>
val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
} else {
Array(byteBuffer)
}
validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
}.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
}
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
}

test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
// write some data
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
}.flatten

val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
// create iterator but don't materialize it
val readData = wal.readAll().asScala.map(byteBufferToString)
wal.close()
if (closeFileAfterWrite) {
// the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able
// to materialize the iterator with parallel recovery
intercept[RejectedExecutionException](readData.toArray)
} else {
assert(readData.toSeq === writtenData)
}
}
}

class FileBasedWriteAheadLogSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
test("log selection and creation") {

val emptyConf = new SparkConf() // no log configuration
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)

// Verify setting driver WAL class
val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)

// Verify setting receiver WAL class
val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)

// Verify setting receiver WAL class with 1-arg constructor
Expand Down Expand Up @@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
}

test("batching is enabled by default in WriteAheadLog") {
val conf = new SparkConf()
assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
// batching is not valid for receiver WALs
assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
}

test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
val conf = new SparkConf()
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true))
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false))
}
}

object WriteAheadLogUtilsSuite {
Expand Down

0 comments on commit de5e531

Please sign in to comment.