diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 4254df44c97a6..7520163522027 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -80,7 +80,7 @@ object SinkFileStatus { * (drops the deleted files). */ class FileStreamSinkLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) { + extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) { import FileStreamSinkLog._ @@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) } } - override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = { + override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = { (VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8) } - override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = { + override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = { val lines = new String(bytes, UTF_8).split("\n") if (lines.length == 0) { throw new IllegalStateException("Incomplete log file") @@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) if (version != VERSION) { throw new IllegalStateException(s"Unknown log version: ${version}") } - lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_)) + lines.slice(1, lines.length).map(read[SinkFileStatus](_)) } - override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { + override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = { if (isCompactionBatch(batchId, compactInterval)) { compact(batchId, logs) } else { @@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs - if (super.add(batchId, compactLogs(allLogs))) { + if (super.add(batchId, compactLogs(allLogs).toArray)) { if (isDeletingExpiredLog) { deleteExpiredLog(batchId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8b969b5e0a55..42fb454c2d158 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -49,7 +49,7 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } - private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath) + private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) @@ -98,7 +98,7 @@ class FileStreamSource( if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles) + metadataLog.add(maxBatchId, batchFiles.toArray) logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 127ece9ab0e56..39a0f3341389c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) extends MetadataLog[T] with Logging { + // Avoid serializing generic sequences, see SPARK-17372 + require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]], + "Should not create a log with type Seq, use Arrays instead - see SPARK-17372") + import HDFSMetadataLog._ val metadataPath = new Path(path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 4d05af0b60358..5e1e5eeb50936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -407,6 +407,9 @@ class StreamExecution( awaitBatchLock.lock() try { awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) + if (streamDeathCause != null) { + throw streamDeathCause + } } finally { awaitBatchLock.unlock() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 39fd1f0cd37bb..26f8b98cb38a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { test("serialize") { withFileStreamSinkLog { sinkLog => - val logs = Seq( + val logs = Array( SinkFileStatus( path = "/a/b/x", size = 100L, @@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { // scalastyle:on assert(expected === new String(sinkLog.serialize(logs), UTF_8)) - assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8)) + assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8)) } } @@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { for (batchId <- 0 to 10) { sinkLog.add( batchId, - Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION))) + Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION))) val expectedFiles = (0 to batchId).map { id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION) } @@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { }.toSet } - sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) assert(Set("0") === listBatchFiles()) - sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) assert(Set("0", "1") === listBatchFiles()) - sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION))) assert(Set("2.compact") === listBatchFiles()) - sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION))) assert(Set("2.compact", "3") === listBatchFiles()) - sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION))) assert(Set("2.compact", "3", "4") === listBatchFiles()) - sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION))) + sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION))) assert(Set("5.compact") === listBatchFiles()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 03222b4a49c6c..886f7be59db93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.streaming import java.io.File -import java.util.UUID + +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ @@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ + override val streamingTimeout = 20.seconds + /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ private def createFileStreamSource( format: String, @@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("SPARK-17372 - write file names to WAL as Array[String]") { + // Note: If this test takes longer than the timeout, then its likely that this is actually + // running a Spark job with 10000 tasks. This test tries to avoid that by + // 1. Setting the threshold for parallel file listing to very high + // 2. Using a query that should use constant folding to eliminate reading of the files + + val numFiles = 10000 + + // This is to avoid running a spark job to list of files in parallel + // by the ListingFileCatalog. + spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) + + withTempDirs { case (root, tmp) => + val src = new File(root, "a=1") + src.mkdirs() + + (1 to numFiles).map { _.toString }.foreach { i => + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + stringToFile(finalFile, i) + } + assert(src.listFiles().size === numFiles) + + val files = spark.readStream.text(root.getCanonicalPath).as[String] + + // Note this query will use constant folding to eliminate the file scan. + // This is to avoid actually running a Spark job with 10000 tasks + val df = files.filter("1 == 0").groupBy().count() + + testStream(df, InternalOutputModes.Complete)( + AddTextFileData("0", src, tmp), + CheckAnswer(0) + ) + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest {