From d66b143eec7f604595089f72d8786edbdcd74282 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 21 Jun 2017 23:43:21 -0700 Subject: [PATCH] [SPARK-21167][SS] Decode the path generated by File sink to handle special characters ## What changes were proposed in this pull request? Decode the path generated by File sink to handle special characters. ## How was this patch tested? The added unit test. Author: Shixiong Zhu Closes #18381 from zsxwing/SPARK-21167. --- .../streaming/FileStreamSinkLog.scala | 5 +++- .../sql/streaming/FileStreamSinkSuite.scala | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 8d718b2164d22..c9939ac1db746 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.net.URI + import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -47,7 +49,8 @@ case class SinkFileStatus( action: String) { def toFileStatus: FileStatus = { - new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, new Path(path)) + new FileStatus( + size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 1a2d3a13f3a4a..bb6a27803bb20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest { } } + test("SPARK-21167: encode and decode path correctly") { + val inputData = MemoryStream[String] + val ds = inputData.toDS() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + val query = ds.map(s => (s, s.length)) + .toDF("value", "len") + .writeStream + .partitionBy("value") + .option("checkpointLocation", checkpointDir) + .format("parquet") + .start(outputDir) + + try { + // The output is partitoned by "value", so the value will appear in the file path. + // This is to test if we handle spaces in the path correctly. + inputData.addData("hello world") + failAfter(streamingTimeout) { + query.processAllAvailable() + } + val outputDf = spark.read.parquet(outputDir) + checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, "hello world")) + } finally { + query.stop() + } + } + test("partitioned writing and batch reading") { val inputData = MemoryStream[Int] val ds = inputData.toDS()