Skip to content

Commit

Permalink
[SPARK-13709][SQL] Initialize deserializer with both table and partit…
Browse files Browse the repository at this point in the history
…ion properties when reading partitioned tables

## What changes were proposed in this pull request?

When reading partitions of a partitioned Hive SerDe table, we only initializes the deserializer using partition properties. However, for SerDes like `AvroSerDe`, essential properties (e.g. Avro schema information) may be defined in table properties. We should merge both table properties and partition properties before initializing the deserializer.

Note that an individual partition may have different properties than the one defined in the table properties (e.g. partitions within a table can have different SerDes). Thus, for any property key defined in both partition and table properties, the value set in partition properties wins.

## How was this patch tested?

New test case added in `QueryPartitionSuite`.

Author: Cheng Lian <[email protected]>

Closes apache#13865 from liancheng/spark-13709-partitioned-avro-table.
  • Loading branch information
liancheng authored and yhuai committed Jun 24, 2016
1 parent cc6778e commit 2d2f607
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.hive

import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
Expand Down Expand Up @@ -230,10 +234,21 @@ class HadoopTableReader(
// Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)

val tableProperties = relation.tableDesc.getProperties

createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
// SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
// information) may be defined in table properties. Here we should merge table properties
// and partition properties before initializing the deserializer. Note that partition
// properties take a higher priority here. For example, a partition may have a different
// SerDe as the one defined in table properties.
val props = new Properties(tableProperties)
partProps.asScala.foreach {
case (key, value) => props.setProperty(key, value)
}
deserializer.initialize(hconf, props)
// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.hive

import java.io.File

import com.google.common.io.Files
import org.apache.hadoop.fs.FileSystem

import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -65,4 +68,82 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
sql("DROP TABLE IF EXISTS createAndInsertTest")
}
}

test("SPARK-13709: reading partitioned Avro table with nested schema") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val tableName = "spark_13709"
val tempTableName = "spark_13709_temp"

new File(path, tableName).mkdir()
new File(path, tempTableName).mkdir()

val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": "int"
| }, {
| "name": "f1",
| "type": {
| "type": "record",
| "name": "inner",
| "fields": [ {
| "name": "f10",
| "type": "int"
| }, {
| "name": "f11",
| "type": "double"
| } ]
| }
| } ]
|}
""".stripMargin

withTable(tableName, tempTableName) {
// Creates the external partitioned Avro table to be tested.
sql(
s"""CREATE EXTERNAL TABLE $tableName
|PARTITIONED BY (ds STRING)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)

// Creates an temporary Avro table used to prepare testing Avro file.
sql(
s"""CREATE EXTERNAL TABLE $tempTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tempTableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)

// Generates Avro data.
sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")

// Adds generated Avro data as a new partition to the testing table.
sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")

// The following query fails before SPARK-13709 is fixed. This is because when reading data
// from table partitions, Avro deserializer needs the Avro schema, which is defined in
// table property "avro.schema.literal". However, we only initializes the deserializer using
// partition properties, which doesn't include the wanted property entry. Merging two sets
// of properties solves the problem.
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Row(1, Row(2, 2.5D), "foo")
)
}
}
}
}

0 comments on commit 2d2f607

Please sign in to comment.