Skip to content

Commit

Permalink
[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for …
Browse files Browse the repository at this point in the history
…streaming aggregation and deduplication.

## What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more context.

Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that `populateStartOffsets` internally called `constructNextBatch` which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in `runActivatedStream` called `constructNextBatch` but only if `populateStartOffsets` hadn't already called it). Instead, the refactoring makes it cleaner.
    - `populateStartOffsets` only the updates `availableOffsets` and `committedOffsets`. Does not call `constructNextBatch`.
    - Main loop in `runActivatedStream` calls `constructNextBatch` which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed.
    - If next batch is ready then we call `runBatch` or sleep.
    - That's it.

- Refactoring watermark management logic - This has been refactored out from `MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.

- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in `constructNextBatch`.

- Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes.
    - `CheckNewAnswer` is a new test action that verifies the new rows generated since the last time the answer was checked by `CheckAnswer`, `CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods.
    - `assertNumStateRows` has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check).

## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the following patterns.
    - Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input.
    - Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation and deduplication

Author: Tathagata Das <[email protected]>

Closes apache#21220 from tdas/SPARK-24157.
  • Loading branch information
tdas committed May 4, 2018
1 parent af4dc50 commit 47b5b68
Show file tree
Hide file tree
Showing 15 changed files with 450 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,14 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10000L)

val STREAMING_NO_DATA_MICRO_BATCHES_ENABLED =
buildConf("spark.sql.streaming.noDataMicroBatchesEnabled")
.doc(
"Whether streaming micro-batch engine will execute batches without data " +
"for eager state management for stateful streaming queries.")
.booleanConf
.createWithDefault(true)

val STREAMING_METRICS_ENABLED =
buildConf("spark.sql.streaming.metricsEnabled")
.doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
Expand Down Expand Up @@ -1313,6 +1321,9 @@ class SQLConf extends Serializable with Logging {
def streamingNoDataProgressEventInterval: Long =
getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)

def streamingNoDataMicroBatchesEnabled: Boolean =
getConf(STREAMING_NO_DATA_MICRO_BATCHES_ENABLED)

def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)

def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,14 @@ class IncrementalExecution(

/** No need assert supported, as this check has already been done */
override def assertSupported(): Unit = { }

/**
* Should the MicroBatchExecution run another batch based on this execution and the current
* updated metadata.
*/
def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
executedPlan.collect {
case p: StateStoreWriter => p.shouldRunAnotherBatch(newMetadata)
}.exists(_ == true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class MicroBatchExecution(
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}

private val watermarkTracker = new WatermarkTracker()

override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
Expand Down Expand Up @@ -128,40 +130,55 @@ class MicroBatchExecution(
* Repeatedly attempts to run batches as data arrives.
*/
protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
triggerExecutor.execute(() => {
startTrigger()

val noDataBatchesEnabled =
sparkSessionForStream.sessionState.conf.streamingNoDataMicroBatchesEnabled

triggerExecutor.execute(() => {
if (isActive) {
var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
var currentBatchHasNewData = false // Whether the current batch had new data

startTrigger()

reportTimeTaken("triggerExecution") {
// We'll do this initialization only once every start / restart
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
logInfo(s"Stream started from $committedOffsets")
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")

// Set this before calling constructNextBatch() so any Spark jobs executed by sources
// while getting new data have the correct description
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)

// Try to construct the next batch. This will return true only if the next batch is
// ready and runnable. Note that the current batch may be runnable even without
// new data to process as `constructNextBatch` may decide to run a batch for
// state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
// is available or not.
currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)

// Remember whether the current batch has data or not. This will be required later
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
// to false as the batch would have already processed the available data.
currentBatchHasNewData = isNewDataAvailable

currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
if (currentBatchIsRunnable) {
if (currentBatchHasNewData) updateStatusMessage("Processing new data")
else updateStatusMessage("No new data but cleaning up state")
runBatch(sparkSessionForStream)
} else {
updateStatusMessage("Waiting for data to arrive")
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}

finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded

// If the current batch has been executed, then increment the batch id, else there was
// no data to execute the batch
if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs)
}
updateStatusMessage("Waiting for next trigger")
isActive
Expand Down Expand Up @@ -211,6 +228,7 @@ class MicroBatchExecution(
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
watermarkTracker.setWatermark(metadata.batchWatermarkMs)
}

/* identify the current batch id: if commit log indicates we successfully processed the
Expand All @@ -235,27 +253,25 @@ class MicroBatchExecution(
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
s"batchid $latestBatchId by one")
}
case None => logInfo("no commit log present")
}
logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
logInfo(s"Resuming at batch $currentBatchId with committed offsets " +
s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
constructNextBatch()
}
}

/**
* Returns true if there is any new data available to be processed.
*/
private def dataAvailable: Boolean = {
private def isNewDataAvailable: Boolean = {
availableOffsets.exists {
case (source, available) =>
committedOffsets
Expand All @@ -266,93 +282,63 @@ class MicroBatchExecution(
}

/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
* Attempts to construct a batch according to:
* - Availability of new data
* - Need for timeouts and state cleanups in stateful operators
*
* Returns true only if the next batch should be executed.
*
* Here is the high-level logic on how this constructs the next batch.
* - Check each source whether new data is available
* - Updated the query's metadata and check using the last execution whether there is any need
* to run another batch (for state clean up, etc.)
* - If either of the above is true, then construct the next batch by committing to the offset
* log that range of offsets that the next batch will process.
*/
private def constructNextBatch(): Unit = {
// Check to see what new data is available.
val hasNewData = {
awaitProgressLock.lock()
try {
// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
case s: MicroBatchReader =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("setOffsetRange") {
// Once v1 streaming source execution is gone, we can refactor this away.
// For now, we set the range here to get the source to infer the available end offset,
// get that offset, and then set the range again when we later execute.
s.setOffsetRange(
toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
Optional.empty())
}

val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
(s, Option(currentOffset))
}.toMap
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)

if (dataAvailable) {
true
} else {
noNewData = true
false
private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked {
// If new data is already available that means this method has already been called before
// and it must have already committed the offset range of next batch to the offset log.
// Hence do nothing, just return true.
if (isNewDataAvailable) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
(s, s.getOffset)
}
} finally {
awaitProgressLock.unlock()
}
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec => e
}.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = watermarkMsMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
watermarkMsMap.put(index, newWatermarkMs)
}

// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!watermarkMsMap.isDefinedAt(index)) {
watermarkMsMap.put(index, 0)
}
case s: MicroBatchReader =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("setOffsetRange") {
// Once v1 streaming source execution is gone, we can refactor this away.
// For now, we set the range here to get the source to infer the available end offset,
// get that offset, and then set the range again when we later execute.
s.setOffsetRange(
toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
Optional.empty())
}

// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
if(!watermarkMsMap.isEmpty) {
val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
s"$batchWatermarkMs")
}
}
}
offsetSeqMetadata = offsetSeqMetadata.copy(
batchWatermarkMs = batchWatermarkMs,
batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
(s, Option(currentOffset))
}.toMap
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)

// Update the query metadata
offsetSeqMetadata = offsetSeqMetadata.copy(
batchWatermarkMs = watermarkTracker.currentWatermark,
batchTimestampMs = triggerClock.getTimeMillis())

// Check whether next batch should be constructed
val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch

if (shouldConstructNextBatch) {
// Commit the next batch offset range to the offset log
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
assert(offsetLog.add(currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
Expand All @@ -373,7 +359,7 @@ class MicroBatchExecution(
reader.commit(reader.deserializeOffset(off.json))
}
} else {
throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist")
}
}

Expand All @@ -384,22 +370,21 @@ class MicroBatchExecution(
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
}
}
noNewData = false
} else {
awaitProgressLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
}
noNewData = true
awaitProgressLockCondition.signalAll()
}
shouldConstructNextBatch
}

/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
logDebug(s"Running batch $currentBatchId")

// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
Expand Down Expand Up @@ -513,17 +498,17 @@ class MicroBatchExecution(
}
}

awaitProgressLock.lock()
try {
// Wake up any threads that are waiting for the stream to progress.
withProgressLocked {
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
awaitProgressLockCondition.signalAll()
} finally {
awaitProgressLock.unlock()
}
watermarkTracker.updateWatermark(lastExecution.executedPlan)
logDebug(s"Completed batch ${currentBatchId}")
}

/** Execute a function while locking the stream from making an progress */
private[sql] def withProgressLocked(f: => Unit): Unit = {
private[sql] def withProgressLocked[T](f: => T): T = {
awaitProgressLock.lock()
try {
f
Expand Down
Loading

0 comments on commit 47b5b68

Please sign in to comment.