Skip to content

Commit

Permalink
[SPARK-21167][SS] Decode the path generated by File sink to handle sp…
Browse files Browse the repository at this point in the history
…ecial characters

## What changes were proposed in this pull request?

Decode the path generated by File sink to handle special characters.

## How was this patch tested?

The added unit test.

Author: Shixiong Zhu <[email protected]>

Closes apache#18381 from zsxwing/SPARK-21167.
  • Loading branch information
zsxwing committed Jun 22, 2017
1 parent 5354337 commit d66b143
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.net.URI

import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
Expand Down Expand Up @@ -47,7 +49,8 @@ case class SinkFileStatus(
action: String) {

def toFileStatus: FileStatus = {
new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path))
new FileStatus(
size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest {
}
}

test("SPARK-21167: encode and decode path correctly") {
val inputData = MemoryStream[String]
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

val query = ds.map(s => (s, s.length))
.toDF("value", "len")
.writeStream
.partitionBy("value")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

try {
// The output is partitoned by "value", so the value will appear in the file path.
// This is to test if we handle spaces in the path correctly.
inputData.addData("hello world")
failAfter(streamingTimeout) {
query.processAllAvailable()
}
val outputDf = spark.read.parquet(outputDir)
checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, "hello world"))
} finally {
query.stop()
}
}

test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
Expand Down

0 comments on commit d66b143

Please sign in to comment.