Skip to content

Commit

Permalink
[SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arr…
Browse files Browse the repository at this point in the history
…ays to save file names in FileStreamSource

## What changes were proposed in this pull request?

When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir.

Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError.

In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true.
- file stream defined on a partitioned directory
- directory has 10k+ files

The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways.
- Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq
- Added a `require` in HDFSMetadataLog such that it is never used with type Seq

## How was this patch tested?

Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above.

Author: Tathagata Das <[email protected]>

Closes apache#14987 from tdas/SPARK-17372.
  • Loading branch information
tdas committed Sep 7, 2016
1 parent d6eede9 commit eb1ab88
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) {
extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {

import FileStreamSinkLog._

Expand Down Expand Up @@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
}
}

override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
(VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
}

override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
val lines = new String(bytes, UTF_8).split("\n")
if (lines.length == 0) {
throw new IllegalStateException("Incomplete log file")
Expand All @@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
if (version != VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
lines.slice(1, lines.length).map(read[SinkFileStatus](_))
}

override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
Expand Down Expand Up @@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs))) {
if (super.add(batchId, compactLogs(allLogs).toArray)) {
if (isDeletingExpiredLog) {
deleteExpiredLog(batchId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)

private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

Expand Down Expand Up @@ -98,7 +98,7 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
metadataLog.add(maxBatchId, batchFiles.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread
class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {

// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
"Should not create a log with type Seq, use Arrays instead - see SPARK-17372")

import HDFSMetadataLog._

val metadataPath = new Path(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ class StreamExecution(
awaitBatchLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
} finally {
awaitBatchLock.unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {

test("serialize") {
withFileStreamSinkLog { sinkLog =>
val logs = Seq(
val logs = Array(
SinkFileStatus(
path = "/a/b/x",
size = 100L,
Expand Down Expand Up @@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))

assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
for (batchId <- 0 to 10) {
sinkLog.add(
batchId,
Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
val expectedFiles = (0 to batchId).map {
id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
}
Expand Down Expand Up @@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}.toSet
}

sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0", "1") === listBatchFiles())
sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact") === listBatchFiles())
sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3") === listBatchFiles())
sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3", "4") === listBatchFiles())
sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
assert(Set("5.compact") === listBatchFiles())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.util.UUID

import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

import testImplicits._

override val streamingTimeout = 20.seconds

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
Expand Down Expand Up @@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}

test("SPARK-17372 - write file names to WAL as Array[String]") {
// Note: If this test takes longer than the timeout, then its likely that this is actually
// running a Spark job with 10000 tasks. This test tries to avoid that by
// 1. Setting the threshold for parallel file listing to very high
// 2. Using a query that should use constant folding to eliminate reading of the files

val numFiles = 10000

// This is to avoid running a spark job to list of files in parallel
// by the ListingFileCatalog.
spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)

withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
src.mkdirs()

(1 to numFiles).map { _.toString }.foreach { i =>
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
stringToFile(finalFile, i)
}
assert(src.listFiles().size === numFiles)

val files = spark.readStream.text(root.getCanonicalPath).as[String]

// Note this query will use constant folding to eliminate the file scan.
// This is to avoid actually running a Spark job with 10000 tasks
val df = files.filter("1 == 0").groupBy().count()

testStream(df, InternalOutputModes.Complete)(
AddTextFileData("0", src, tmp),
CheckAnswer(0)
)
}
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down

0 comments on commit eb1ab88

Please sign in to comment.