Skip to content

Commit

Permalink
[SPARK-16344][SQL] Decoding Parquet array of struct with a single fie…
Browse files Browse the repository at this point in the history
…ld named "element"

## What changes were proposed in this pull request?

Due to backward-compatibility reasons, the following Parquet schema is ambiguous:

```
optional group f (LIST) {
  repeated group list {
    optional group element {
      optional int32 element;
    }
  }
}
```

According to the parquet-format spec, when interpreted as a standard 3-level layout, this type is equivalent to the following SQL type:

```
ARRAY<STRUCT<element: INT>>
```

However, when interpreted as a legacy 2-level layout, it's equivalent to

```
ARRAY<STRUCT<element: STRUCT<element: INT>>>
```

Historically, to disambiguate these cases, we employed two methods:

- `ParquetSchemaConverter.isElementType()`

  Used to disambiguate the above cases while converting Parquet types to Spark types.

- `ParquetRowConverter.isElementType()`

  Used to disambiguate the above cases while instantiating row converters that convert Parquet records to Spark rows.

Unfortunately, these two methods make different decision about the above problematic Parquet type, and caused SPARK-16344.

`ParquetRowConverter.isElementType()` is necessary for Spark 1.4 and earlier versions because Parquet requested schemata are directly converted from Spark schemata in these versions. The converted Parquet schemata may be incompatible with actual schemata of the underlying physical files when the files are written by a system/library that uses a schema conversion scheme that is different from Spark when writing Parquet LIST and MAP fields.

In Spark 1.5, Parquet requested schemata are always properly tailored from schemata of physical files to be read. Thus `ParquetRowConverter.isElementType()` is no longer necessary. This PR replaces this method with a simply yet accurate scheme: whenever an ambiguous Parquet type is hit, convert the type in question back to a Spark type using `ParquetSchemaConverter` and check whether it matches the corresponding Spark type.

## How was this patch tested?

New test cases added in `ParquetHiveCompatibilitySuite` and `ParquetQuerySuite`.

Author: Cheng Lian <[email protected]>

Closes apache#14014 from liancheng/spark-16344-for-master-and-2.0.
  • Loading branch information
liancheng authored and yhuai committed Jul 20, 2016
1 parent e3cd5b3 commit e651900
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with

new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema))
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetSchemaConverter(conf))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import org.apache.spark.sql.types.StructType
*
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
extends RecordMaterializer[InternalRow] {

private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)
private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}

Expand Down Expand Up @@ -113,12 +113,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
* any "parent" container.
*
* @param schemaConverter A utility converter used to convert Parquet types to Catalyst types.
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
schemaConverter: ParquetSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
updater: ParentContainerUpdater)
Expand Down Expand Up @@ -292,9 +294,10 @@ private[parquet] class ParquetRowConverter(
new ParquetMapConverter(parquetType.asGroupType(), t, updater)

case t: StructType =>
new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})

case t =>
throw new RuntimeException(
Expand Down Expand Up @@ -442,13 +445,46 @@ private[parquet] class ParquetRowConverter(
private val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType
val parentName = parquetSchema.getName

if (isElementType(repeatedType, elementType, parentName)) {
// At this stage, we're not sure whether the repeated field maps to the element type or is
// just the syntactic repeated group of the 3-level standard LIST layout. Take the following
// Parquet LIST-annotated group type as an example:
//
// optional group f (LIST) {
// repeated group list {
// optional group element {
// optional int32 element;
// }
// }
// }
//
// This type is ambiguous:
//
// 1. When interpreted as a standard 3-level layout, the `list` field is just the syntactic
// group, and the entire type should be translated to:
//
// ARRAY<STRUCT<element: INT>>
//
// 2. On the other hand, when interpreted as a non-standard 2-level layout, the `list` field
// represents the element type, and the entire type should be translated to:
//
// ARRAY<STRUCT<element: STRUCT<element: INT>>>
//
// Here we try to convert field `list` into a Catalyst type to see whether the converted type
// matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise,
// it's case 2.
val guessedElementType = schemaConverter.convertField(repeatedType)

if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {
// If the repeated field corresponds to the element type, creates a new converter using the
// type of the repeated field.
newConverter(repeatedType, elementType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentArray += value
})
} else {
// If the repeated field corresponds to the syntactic group in the standard 3-level Parquet
// LIST layout, creates a new converter using the only child field of the repeated field.
assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1)
new ElementConverter(repeatedType.asGroupType().getType(0), elementType)
}
}
Expand All @@ -462,37 +498,6 @@ private[parquet] class ParquetRowConverter(
// in row cells.
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]

// scalastyle:off
/**
* Returns whether the given type is the element type of a list or is a syntactic group with
* one field that is the element type. This is determined by checking whether the type can be
* a syntactic group and by checking whether a potential syntactic group matches the expected
* schema.
* {{{
* <list-repetition> group <name> (LIST) {
* repeated group list { <-- repeatedType points here
* <element-repetition> <element-type> element;
* }
* }
* }}}
* In short, here we handle Parquet list backwards-compatibility rules on the read path. This
* method is based on `AvroIndexedRecordConverter.isElementType`.
*
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
*/
// scalastyle:on
private def isElementType(
parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = {
(parquetRepeatedType, catalystElementType) match {
case (t: PrimitiveType, _) => true
case (t: GroupType, _) if t.getFieldCount > 1 => true
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true
case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
case _ => false
}
}

/** Array element converter */
private final class ElementConverter(parquetType: Type, catalystType: DataType)
extends GroupConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private[parquet] class ParquetSchemaConverter(
{
// For legacy 2-level list types with primitive element type, e.g.:
//
// // List<Integer> (nullable list, non-null elements)
// // ARRAY<INT> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated int32 element;
// }
Expand All @@ -270,7 +270,7 @@ private[parquet] class ParquetSchemaConverter(
// For legacy 2-level list types whose element type is a group type with 2 or more fields,
// e.g.:
//
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
// // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
Expand All @@ -282,7 +282,7 @@ private[parquet] class ParquetSchemaConverter(
} || {
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
//
// // List<OneTuple<String>> (nullable list, non-null elements)
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group array {
// required binary str (UTF8);
Expand All @@ -293,7 +293,7 @@ private[parquet] class ParquetSchemaConverter(
} || {
// For Parquet data generated by parquet-thrift, e.g.:
//
// // List<OneTuple<String>> (nullable list, non-null elements)
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
// optional group my_list (LIST) {
// repeated group my_list_tuple {
// required binary str (UTF8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.BatchedDataSourceScanExec
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -668,9 +668,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("SPARK-16344: array of struct with a single field named 'element'") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path)

checkAnswer(
sqlContext.read.parquet(path),
Row(Array(Row(42)))
)
}
}
}

object TestingUDT {
case class SingleElement(element: Long)

@SQLUserDefinedType(udt = classOf[NestedStructUDT])
case class NestedStruct(a: Integer, b: Long, c: Double)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.hive

import java.sql.Timestamp

import org.apache.hadoop.hive.conf.HiveConf

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -137,4 +135,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
Row(Row(1, Seq("foo", "bar", null))),
"STRUCT<f0: INT, f1: ARRAY<STRING>>")
}

test("SPARK-16344: array of struct with a single field named 'array_element'") {
testParquetHiveCompatibility(
Row(Seq(Row(1))),
"ARRAY<STRUCT<array_element: INT>>")
}
}

0 comments on commit e651900

Please sign in to comment.