Skip to content

Commit

Permalink
[SPARK-42968][SS] Add option to skip commit coordinator as part of St…
Browse files Browse the repository at this point in the history
…reamingWrite API for DSv2 sources/sinks

### What changes were proposed in this pull request?
Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks. This option was already present as part of the BatchWrite API

### Why are the changes needed?
Sinks such as the following are atleast-once for which we do not need to go through the commit coordinator on the driver to ensure that a single partition commits. This is even less useful for streaming use-cases where batches could be replayed from the checkpoint dir.

- memory sink
- console sink
- no-op sink
- Kafka v2 sink

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test for the change
```
[info] ReportSinkMetricsSuite:
22:23:01.276 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22:23:03.139 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[info] - test ReportSinkMetrics with useCommitCoordinator=true (2 seconds, 709 milliseconds)
22:23:04.522 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
[info] - test ReportSinkMetrics with useCommitCoordinator=false (373 milliseconds)
22:23:04.941 WARN org.apache.spark.sql.streaming.ReportSinkMetricsSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.ReportSinkMetricsSuite, threads: ForkJoinPool.commonPool-worker-19 (daemon=true), rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
[info] Run completed in 4 seconds, 934 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Closes apache#40600 from anishshri-db/task/SPARK-42968.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
anishshri-db authored and HeartSaVioR committed Mar 30, 2023
1 parent f4af6a0 commit 122a88c
Showing 7 changed files with 67 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@ private[kafka010] class KafkaStreamingWrite(
info: PhysicalWriteInfo): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)

override def useCommitCoordinator(): Boolean = false

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -58,6 +58,16 @@ public interface StreamingWrite {
*/
StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);

/**
* Returns whether Spark should use the commit coordinator to ensure that at most one task for
* each partition commits.
*
* @return true if commit coordinator should be used, false otherwise.
*/
default boolean useCommitCoordinator() {
return true;
}

/**
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
private[noop] object NoopStreamingWrite extends StreamingWrite {
override def createStreamingWriterFactory(
info: PhysicalWriteInfo): StreamingDataWriterFactory = NoopStreamingDataWriterFactory
override def useCommitCoordinator(): Boolean = false
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@ class ConsoleWrite(schema: StructType, options: CaseInsensitiveStringMap)
def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory =
PackedRowWriterFactory

override def useCommitCoordinator(): Boolean = false

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2
// behavior.
Original file line number Diff line number Diff line change
@@ -31,6 +31,10 @@ class MicroBatchWrite(epochId: Long, val writeSupport: StreamingWrite) extends B
s"MicroBatchWrite[epoch: $epochId, writer: $writeSupport]"
}

override def useCommitCoordinator(): Boolean = {
writeSupport.useCommitCoordinator()
}

override def commit(messages: Array[WriterCommitMessage]): Unit = {
writeSupport.commit(epochId, messages)
}
Original file line number Diff line number Diff line change
@@ -143,6 +143,8 @@ class MemoryStreamingWrite(
MemoryWriterFactory(schema)
}

override def useCommitCoordinator(): Boolean = false

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
val newRows = messages.flatMap {
case message: MemoryWriterCommitMessage => message.data
Original file line number Diff line number Diff line change
@@ -36,53 +36,56 @@ class ReportSinkMetricsSuite extends StreamTest {

import testImplicits._

test("test ReportSinkMetrics") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
var query: StreamingQuery = null
Seq("true", "false").foreach { useCommitCoordinator =>
test(s"test ReportSinkMetrics with useCommitCoordinator=$useCommitCoordinator") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
var query: StreamingQuery = null

var metricsMap: java.util.Map[String, String] = null
var metricsMap: java.util.Map[String, String] = null

val listener = new StreamingQueryListener {
val listener = new StreamingQueryListener {

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
metricsMap = event.progress.sink.metrics
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
metricsMap = event.progress.sink.metrics
}

override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(listener)

spark.streams.addListener(listener)
withTempDir { dir =>
try {
query =
df.writeStream
.outputMode("append")
.format("org.apache.spark.sql.streaming.TestSinkProvider")
.option("useCommitCoordinator", useCommitCoordinator)
.option("checkPointLocation", dir.toString)
.start()

withTempDir { dir =>
try {
query =
df.writeStream
.outputMode("append")
.format("org.apache.spark.sql.streaming.TestSinkProvider")
.option("checkPointLocation", dir.toString)
.start()
inputData.addData(1, 2, 3)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

failAfter(streamingTimeout) {
query.processAllAvailable()
}
spark.sparkContext.listenerBus.waitUntilEmpty()

spark.sparkContext.listenerBus.waitUntilEmpty()
assertResult(metricsMap) {
Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava
}
} finally {
if (query != null) {
query.stop()
}

assertResult(metricsMap) {
Map("metrics-1" -> "value-1", "metrics-2" -> "value-2").asJava
}
} finally {
if (query != null) {
query.stop()
spark.streams.removeListener(listener)
}

spark.streams.removeListener(listener)
}
}
}
@@ -98,7 +101,8 @@ class ReportSinkMetricsSuite extends StreamTest {
with CreatableRelationProvider with Logging {

override def getTable(options: CaseInsensitiveStringMap): Table = {
TestSinkTable
val useCommitCoordinator = options.getBoolean("useCommitCoordinator", false)
new TestSinkTable(useCommitCoordinator)
}

def createRelation(
@@ -113,7 +117,8 @@ class ReportSinkMetricsSuite extends StreamTest {
def shortName(): String = "test"
}

object TestSinkTable extends Table with SupportsWrite with ReportsSinkMetrics with Logging {
class TestSinkTable(useCommitCoordinator: Boolean)
extends Table with SupportsWrite with ReportsSinkMetrics with Logging {

override def name(): String = "test"

@@ -131,7 +136,7 @@ class ReportSinkMetricsSuite extends StreamTest {
override def build(): Write = {
new Write {
override def toStreaming: StreamingWrite = {
new TestSinkWrite()
new TestSinkWrite(useCommitCoordinator)
}
}
}
@@ -143,13 +148,15 @@ class ReportSinkMetricsSuite extends StreamTest {
}
}

class TestSinkWrite()
class TestSinkWrite(useCommitCoordinator: Boolean)
extends StreamingWrite with Logging with Serializable {

def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory =
PackedRowWriterFactory

override def useCommitCoordinator(): Boolean = useCommitCoordinator

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}

def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}
}

0 comments on commit 122a88c

Please sign in to comment.