Skip to content

Commit

Permalink
[SPARK-20441][SPARK-20432][SS] Within the same streaming query, one S…
Browse files Browse the repository at this point in the history
…treamingRelation should only be transformed to one StreamingExecutionRelation

## What changes were proposed in this pull request?

Within the same streaming query, when one `StreamingRelation` is referred multiple times – e.g. `df.union(df)` – we should transform it only to one `StreamingExecutionRelation`, instead of two or more different `StreamingExecutionRelation`s (each of which would have a separate set of source, source logs, ...).

## How was this patch tested?

Added two test cases, each of which would fail without this patch.

Author: Liwei Lin <[email protected]>

Closes apache#17735 from lw-lin/SPARK-20441.
  • Loading branch information
lw-lin authored and brkyvz committed May 3, 2017
1 parent 7f96f2d commit 27f543b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock

import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

Expand Down Expand Up @@ -148,15 +149,18 @@ class StreamExecution(
"logicalPlan must be initialized in StreamExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val _logicalPlan = analyzedPlan.transform {
case StreamingRelation(dataSource, _, output) =>
// Materialize source to avoid creating it in every batch
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
case streamingRelation@StreamingRelation(dataSource, _, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
val source = dataSource.createSource(metadataPath)
nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
})
}
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
uniqueSources = sources.distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
}

test("SPARK-20432: union one stream with itself") {
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
val unioned = df.union(df)
withTempDir { outputDir =>
withTempDir { checkpointDir =>
val query =
unioned
.writeStream.format("parquet")
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start(outputDir.getAbsolutePath)
try {
query.processAllAvailable()
val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*)
} finally {
query.stop()
}
}
}
}

test("union two streams") {
val inputData1 = MemoryStream[Int]
val inputData2 = MemoryStream[Int]
Expand Down Expand Up @@ -122,6 +143,33 @@ class StreamSuite extends StreamTest {
assertDF(df)
}

test("Within the same streaming query, one StreamingRelation should only be transformed to one " +
"StreamingExecutionRelation") {
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
var query: StreamExecution = null
try {
query =
df.union(df)
.writeStream
.format("memory")
.queryName("memory")
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
query.awaitInitialization(streamingTimeout.toMillis)
val executionRelations =
query
.logicalPlan
.collect { case ser: StreamingExecutionRelation => ser }
assert(executionRelations.size === 2)
assert(executionRelations.distinct.size === 1)
} finally {
if (query != null) {
query.stop()
}
}
}

test("unsupported queries") {
val streamInput = MemoryStream[Int]
val batchInput = Seq(1, 2, 3).toDS()
Expand Down

0 comments on commit 27f543b

Please sign in to comment.