Skip to content

Commit

Permalink
[SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for …
Browse files Browse the repository at this point in the history
…metadata and add a test for FIXED_LEN_BYTE_ARRAY

As discussed apache/spark#9660 apache/spark#9060, I cleaned up unused imports, added a test for fixed-length byte array and used a common function for writing metadata for Parquet.

For the test for fixed-length byte array, I have tested and checked the encoding types with [parquet-tools](https://github.com/Parquet/parquet-mr/tree/master/parquet-tools).

Author: hyukjinkwon <[email protected]>

Closes #9754 from HyukjinKwon/SPARK-11694-followup.
  • Loading branch information
HyukjinKwon authored and liancheng committed Nov 17, 2015
1 parent fbad920 commit 75d2020
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
Binary file added sql/core/src/test/resources/dec-in-fixed-len.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.util.Collections

import org.apache.parquet.column.{Encoding, ParquetProperties}

import scala.collection.JavaConverters._
Expand All @@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
""".stripMargin)

withTempPath { location =>
val extraMetadata = Map.empty[String, String].asJava
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava

ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)

val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
}.toString
Expand All @@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
|}
""".stripMargin)

val expectedSparkTypes = Seq(StringType, BinaryType)

withTempPath { location =>
val extraMetadata = Map.empty[String, String].asJava
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava

ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)

val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType
assert(jsonDataType === StringType)
val bsonDataType = sqlContext.read.parquet(path.toString).schema(1).dataType
assert(bsonDataType === BinaryType)
val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf)
val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
}
}

Expand Down Expand Up @@ -607,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}

// TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY`
// The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't
// provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`. Should add a test here once
// we upgrade to `PARQUET_2_0`.
test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("dec-in-fixed-len.parquet"),
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down

0 comments on commit 75d2020

Please sign in to comment.