Skip to content

Commit

Permalink
[SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet dat…
Browse files Browse the repository at this point in the history
…a source improvements

This PR adds three major improvements to Parquet data source:

1.  Partition discovery

    While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.

    This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.

1.  Schema merging

    When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.

1.  Metastore Parquet table conversion moved to analysis phase

    This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.

This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.

Other JIRA tickets fixed as side effects in this PR:

- [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.

- [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.

TODO:

- [ ] More test cases for partition discovery
- [x] Fix write path after data source write support (apache#4294) is merged

      It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.

- [ ] Fix outdated comments and documentations

PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes.

[1]: https://issues.apache.org/jira/browse/SPARK-5182
[2]: https://issues.apache.org/jira/browse/SPARK-5528
[3]: https://issues.apache.org/jira/browse/SPARK-5509
[4]: https://issues.apache.org/jira/browse/SPARK-3575

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes apache#4308 from liancheng/parquet-partition-discovery and squashes the following commits:

b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments
8232e17 [Cheng Lian] Write support for Parquet data source
a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider"
808380f [Cheng Lian] Fixes issues introduced while rebasing
50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging
adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing
4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method
0d8ec1d [Cheng Lian] Adds more test cases
b35c8c6 [Cheng Lian] Fixes some typos and outdated comments
dd704fd [Cheng Lian] Fixes Python Parquet API
596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not
7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion
a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite
5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging
  • Loading branch information
liancheng authored and marmbrus committed Feb 5, 2015
1 parent c19152c commit a9ed511
Show file tree
Hide file tree
Showing 24 changed files with 1,541 additions and 736 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
Expand Down Expand Up @@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path) = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
}

object SparkHadoopUtil {
Expand Down
9 changes: 7 additions & 2 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ def registerRDDAsTable(self, rdd, tableName):
else:
raise ValueError("Can only register DataFrame as table")

def parquetFile(self, path):
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a L{DataFrame}.
>>> import tempfile, shutil
Expand All @@ -1483,7 +1483,12 @@ def parquetFile(self, path):
>>> sorted(df.collect()) == sorted(df2.collect())
True
"""
jdf = self._ssql_ctx.parquetFile(path)
gateway = self._sc._gateway
jpath = paths[0]
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths) - 1)
for i in range(1, len(paths)):
jpaths[i] = paths[i]
jdf = self._ssql_ctx.parquetFile(jpath, jpaths)
return DataFrame(jdf, self)

def jsonFile(self, path, schema=None, samplingRatio=1.0):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.{BinaryType, BooleanType}

object InterpretedPredicate {
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
Expand Down Expand Up @@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
null
} else {
val r = right.eval(input)
if (r == null) null else l == r
if (r == null) null
else if (left.dataType != BinaryType) l == r
else BinaryType.ordering.compare(
l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.types

import java.sql.Timestamp

import scala.collection.mutable.ArrayBuffer
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
Expand All @@ -29,6 +30,7 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
Expand Down Expand Up @@ -159,7 +161,6 @@ object DataType {
case failure: NoSuccess =>
throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
}

}

protected[types] def buildFormattedString(
Expand Down Expand Up @@ -754,6 +755,57 @@ object StructType {
def apply(fields: java.util.List[StructField]): StructType = {
StructType(fields.toArray.asInstanceOf[Array[StructField]])
}

private[sql] def merge(left: DataType, right: DataType): DataType =
(left, right) match {
case (ArrayType(leftElementType, leftContainsNull),
ArrayType(rightElementType, rightContainsNull)) =>
ArrayType(
merge(leftElementType, rightElementType),
leftContainsNull || rightContainsNull)

case (MapType(leftKeyType, leftValueType, leftContainsNull),
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
MapType(
merge(leftKeyType, rightKeyType),
merge(leftValueType, rightValueType),
leftContainsNull || rightContainsNull)

case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]

leftFields.foreach {
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
rightFields
.find(_.name == leftName)
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
leftField.copy(
dataType = merge(leftType, rightType),
nullable = leftNullable || rightNullable)
}
.orElse(Some(leftField))
.foreach(newFields += _)
}

rightFields
.filterNot(f => leftFields.map(_.name).contains(f.name))
.foreach(newFields += _)

StructType(newFields)

case (DecimalType.Fixed(leftPrecision, leftScale),
DecimalType.Fixed(rightPrecision, rightScale)) =>
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))

case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
if leftUdt.userClass == rightUdt.userClass => leftUdt

case (leftType, rightType) if leftType == rightType =>
leftType

case _ =>
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
}
}


Expand Down Expand Up @@ -890,6 +942,20 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
s"struct<${fieldTypes.mkString(",")}>"
}

/**
* Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
* B from `that`,
*
* 1. If A and B have the same name and data type, they are merged to a field C with the same name
* and data type. C is nullable if and only if either A or B is nullable.
* 2. If A doesn't exist in `that`, it's included in the result schema.
* 3. If B doesn't exist in `this`, it's also included in the result schema.
* 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
* thrown.
*/
private[sql] def merge(that: StructType): StructType =
StructType.merge(this, that).asInstanceOf[StructType]
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ private[sql] class DataFrameImpl protected[sql](
}

override def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
if (sqlContext.conf.parquetUseDataSourceApi) {
save("org.apache.spark.sql.parquet", "path" -> path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}

override def saveAsTable(tableName: String): Unit = {
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private[spark] object SQLConf {
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
Expand Down Expand Up @@ -105,6 +106,10 @@ private[sql] class SQLConf extends Serializable {
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true uses Parquet implementation based on data source API */
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean

Expand Down
20 changes: 13 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql
import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.{SparkContext, Partition}
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -36,11 +35,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.json._
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -303,8 +303,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def parquetFile(path: String): DataFrame =
DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
@scala.annotation.varargs
def parquetFile(path: String, paths: String*): DataFrame =
if (conf.parquetUseDataSourceApi) {
baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
}

/**
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SQLContext, Strategy, execution}

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType


private[sql] class DefaultSource
extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {

/** Returns a new base relation with the parameters. */
override def createRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
val attributesSize = attributes.size
if (attributesSize > record.size) {
throw new IndexOutOfBoundsException(
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
}

var index = 0
Expand Down Expand Up @@ -325,7 +325,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
val attributesSize = attributes.size
if (attributesSize > record.size) {
throw new IndexOutOfBoundsException(
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
}

var index = 0
Expand All @@ -348,10 +348,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
index: Int): Unit = {
ctype match {
case StringType => writer.addBinary(
Binary.fromByteArray(
record(index).asInstanceOf[String].getBytes("utf-8")
)
)
Binary.fromByteArray(record(index).asInstanceOf[String].getBytes("utf-8")))
case BinaryType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
case IntegerType => writer.addInteger(record.getInt(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,23 @@ package org.apache.spark.sql.parquet

import java.io.IOException

import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job

import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import parquet.hadoop.util.ContextUtil
import parquet.schema.{Type => ParquetType, Types => ParquetTypes, PrimitiveType => ParquetPrimitiveType, MessageType}
import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns, DecimalMetadata}
import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types._
import org.apache.spark.{Logging, SparkException}

// Implicits
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -285,7 +284,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
ctype: DataType,
name: String,
nullable: Boolean = true,
inArray: Boolean = false,
inArray: Boolean = false,
toThriftSchemaNames: Boolean = false): ParquetType = {
val repetition =
if (inArray) {
Expand Down Expand Up @@ -340,7 +339,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
case StructType(structFields) => {
val fields = structFields.map {
field => fromDataType(field.dataType, field.name, field.nullable,
field => fromDataType(field.dataType, field.name, field.nullable,
inArray = false, toThriftSchemaNames)
}
new ParquetGroupType(repetition, name, fields.toSeq)
Expand Down
Loading

0 comments on commit a9ed511

Please sign in to comment.