Skip to content

Commit

Permalink
[SPARK-5950][SQL]Insert array into a metastore table saved as parquet…
Browse files Browse the repository at this point in the history
… should work when using datasource api

This PR contains the following changes:
1. Add a new method, `DataType.equalsIgnoreCompatibleNullability`, which is the middle ground between DataType's equality check and `DataType.equalsIgnoreNullability`. For two data types `from` and `to`, it does `equalsIgnoreNullability` as well as if the nullability of `from` is compatible with that of `to`. For example, the nullability of `ArrayType(IntegerType, containsNull = false)` is compatible with that of `ArrayType(IntegerType, containsNull = true)` (for an array without null values, we can always say it may contain null values). However,  the nullability of `ArrayType(IntegerType, containsNull = true)` is incompatible with that of `ArrayType(IntegerType, containsNull = false)` (for an array that may have null values, we cannot say it does not have null values).
2. For the `resolved` field of `InsertIntoTable`, use `equalsIgnoreCompatibleNullability` to replace the equality check of the data types.
3. For our data source write path, when appending data, we always use the schema of existing table to write the data. This is important for parquet, since nullability direct impacts the way to encode/decode values. If we do not do this, we may see corrupted values when reading values from a set of parquet files generated with different nullability settings.
4. When generating a new parquet table, we always set nullable/containsNull/valueContainsNull to true. So, we will not face situations that we cannot append data because containsNull/valueContainsNull in an Array/Map column of the existing table has already been set to `false`. This change makes the whole data pipeline more robust.
5. Update the equality check of JSON relation. Since JSON does not really cares nullability,  `equalsIgnoreNullability` seems a better choice to compare schemata from to JSON tables.

JIRA: https://issues.apache.org/jira/browse/SPARK-5950

Thanks viirya for the initial work in apache#4729.

cc marmbrus liancheng

Author: Yin Huai <[email protected]>

Closes apache#4826 from yhuai/insertNullabilityCheck and squashes the following commits:

3b61a04 [Yin Huai] Revert change on equals.
80e487e [Yin Huai] asNullable in UDT.
587d88b [Yin Huai] Make methods private.
0cb7ea2 [Yin Huai] marmbrus's comments.
3cec464 [Yin Huai] Cheng's comments.
486ed08 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck
d3747d1 [Yin Huai] Remove unnecessary change.
8360817 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertNullabilityCheck
8a3f237 [Yin Huai] Use equalsIgnoreNullability instead of equality check.
0eb5578 [Yin Huai] Fix tests.
f6ed813 [Yin Huai] Update old parquet path.
e4f397c [Yin Huai] Unit tests.
b2c06f8 [Yin Huai] Ignore nullability in JSON relation's equality check.
8bd008b [Yin Huai] nullable, containsNull, and valueContainsNull will be always true for parquet data.
bf50d73 [Yin Huai] When appending data, we use the schema of the existing table instead of the schema of the new data.
0a703e7 [Yin Huai] Test failed again since we cannot read correct content.
9a26611 [Yin Huai] Make InsertIntoTable happy.
8f19fe5 [Yin Huai] equalsIgnoreCompatibleNullability
4ec17fd [Yin Huai] Failed test.
  • Loading branch information
yhuai authored and marmbrus committed Mar 3, 2015
1 parent 9eb22ec commit 1259994
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
case _ => false
}
}

private[spark] override def asNullable: VectorUDT = this
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[mllib] object Loader {
assert(loadedFields.contains(field.name), s"Unable to parse model data." +
s" Expected field with name ${field.name} was missing in loaded schema:" +
s" ${loadedFields.mkString(", ")}")
assert(loadedFields(field.name) == field.dataType,
assert(loadedFields(field.name).sameType(field.dataType),
s"Unable to parse model data. Expected field $field but found field" +
s" with different type: ${loadedFields(field.name)}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ case class InsertIntoTable(
override def output = child.output

override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
case (childAttr, tableAttr) => childAttr.dataType == tableAttr.dataType
case (childAttr, tableAttr) =>
DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object DataType {
/**
* Compares two types, ignoring nullability of ArrayType, MapType, StructType.
*/
private[sql] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
(left, right) match {
case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
equalsIgnoreNullability(leftElementType, rightElementType)
Expand All @@ -198,6 +198,43 @@ object DataType {
case (left, right) => left == right
}
}

/**
* Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
*
* Compatible nullability is defined as follows:
* - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
* if and only if `to.containsNull` is true, or both of `from.containsNull` and
* `to.containsNull` are false.
* - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
* if and only if `to.valueContainsNull` is true, or both of `from.valueContainsNull` and
* `to.valueContainsNull` are false.
* - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
* if and only if for all every pair of fields, `to.nullable` is true, or both
* of `fromField.nullable` and `toField.nullable` are false.
*/
private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
(from, to) match {
case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
(tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)

case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
(tn || !fn) &&
equalsIgnoreCompatibleNullability(fromKey, toKey) &&
equalsIgnoreCompatibleNullability(fromValue, toValue)

case (StructType(fromFields), StructType(toFields)) =>
fromFields.size == toFields.size &&
fromFields.zip(toFields).forall {
case (fromField, toField) =>
fromField.name == toField.name &&
(toField.nullable || !fromField.nullable) &&
equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
}

case (fromDataType, toDataType) => fromDataType == toDataType
}
}
}


Expand Down Expand Up @@ -230,6 +267,17 @@ abstract class DataType {
def prettyJson: String = pretty(render(jsonValue))

def simpleString: String = typeName

/** Check if `this` and `other` are the same data type when ignoring nullability
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)

/** Returns the same data type but set all nullability fields are true
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def asNullable: DataType
}

/**
Expand All @@ -245,6 +293,8 @@ class NullType private() extends DataType {
// this type. Otherwise, the companion object would be of type "NullType$" in byte code.
// Defined with a private constructor so the companion object is the only possible instantiation.
override def defaultSize: Int = 1

private[spark] override def asNullable: NullType = this
}

case object NullType extends NullType
Expand Down Expand Up @@ -310,6 +360,8 @@ class StringType private() extends NativeType with PrimitiveType {
* The default size of a value of the StringType is 4096 bytes.
*/
override def defaultSize: Int = 4096

private[spark] override def asNullable: StringType = this
}

case object StringType extends StringType
Expand Down Expand Up @@ -344,6 +396,8 @@ class BinaryType private() extends NativeType with PrimitiveType {
* The default size of a value of the BinaryType is 4096 bytes.
*/
override def defaultSize: Int = 4096

private[spark] override def asNullable: BinaryType = this
}

case object BinaryType extends BinaryType
Expand All @@ -369,6 +423,8 @@ class BooleanType private() extends NativeType with PrimitiveType {
* The default size of a value of the BooleanType is 1 byte.
*/
override def defaultSize: Int = 1

private[spark] override def asNullable: BooleanType = this
}

case object BooleanType extends BooleanType
Expand Down Expand Up @@ -399,6 +455,8 @@ class TimestampType private() extends NativeType {
* The default size of a value of the TimestampType is 12 bytes.
*/
override def defaultSize: Int = 12

private[spark] override def asNullable: TimestampType = this
}

case object TimestampType extends TimestampType
Expand Down Expand Up @@ -427,6 +485,8 @@ class DateType private() extends NativeType {
* The default size of a value of the DateType is 4 bytes.
*/
override def defaultSize: Int = 4

private[spark] override def asNullable: DateType = this
}

case object DateType extends DateType
Expand Down Expand Up @@ -485,6 +545,8 @@ class LongType private() extends IntegralType {
override def defaultSize: Int = 8

override def simpleString = "bigint"

private[spark] override def asNullable: LongType = this
}

case object LongType extends LongType
Expand Down Expand Up @@ -514,6 +576,8 @@ class IntegerType private() extends IntegralType {
override def defaultSize: Int = 4

override def simpleString = "int"

private[spark] override def asNullable: IntegerType = this
}

case object IntegerType extends IntegerType
Expand Down Expand Up @@ -543,6 +607,8 @@ class ShortType private() extends IntegralType {
override def defaultSize: Int = 2

override def simpleString = "smallint"

private[spark] override def asNullable: ShortType = this
}

case object ShortType extends ShortType
Expand Down Expand Up @@ -572,6 +638,8 @@ class ByteType private() extends IntegralType {
override def defaultSize: Int = 1

override def simpleString = "tinyint"

private[spark] override def asNullable: ByteType = this
}

case object ByteType extends ByteType
Expand Down Expand Up @@ -638,6 +706,8 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
case None => "decimal(10,0)"
}

private[spark] override def asNullable: DecimalType = this
}


Expand Down Expand Up @@ -696,6 +766,8 @@ class DoubleType private() extends FractionalType {
* The default size of a value of the DoubleType is 8 bytes.
*/
override def defaultSize: Int = 8

private[spark] override def asNullable: DoubleType = this
}

case object DoubleType extends DoubleType
Expand Down Expand Up @@ -724,6 +796,8 @@ class FloatType private() extends FractionalType {
* The default size of a value of the FloatType is 4 bytes.
*/
override def defaultSize: Int = 4

private[spark] override def asNullable: FloatType = this
}

case object FloatType extends FloatType
Expand Down Expand Up @@ -772,6 +846,9 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
override def defaultSize: Int = 100 * elementType.defaultSize

override def simpleString = s"array<${elementType.simpleString}>"

private[spark] override def asNullable: ArrayType =
ArrayType(elementType.asNullable, containsNull = true)
}


Expand Down Expand Up @@ -1017,6 +1094,15 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
*/
private[sql] def merge(that: StructType): StructType =
StructType.merge(this, that).asInstanceOf[StructType]

private[spark] override def asNullable: StructType = {
val newFields = fields.map {
case StructField(name, dataType, nullable, metadata) =>
StructField(name, dataType.asNullable, nullable = true, metadata)
}

StructType(newFields)
}
}


Expand Down Expand Up @@ -1069,6 +1155,9 @@ case class MapType(
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)

override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>"

private[spark] override def asNullable: MapType =
MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
}


Expand Down Expand Up @@ -1122,4 +1211,10 @@ abstract class UserDefinedType[UserType] extends DataType with Serializable {
* The default size of a value of the UserDefinedType is 4096 bytes.
*/
override def defaultSize: Int = 4096

/**
* For UDT, asNullable will not change the nullability of its internal sqlType and just returns
* itself.
*/
private[spark] override def asNullable: UserDefinedType[UserType] = this
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,87 @@ class DataTypeSuite extends FunSuite {
checkDefaultSize(MapType(IntegerType, StringType, true), 410000)
checkDefaultSize(MapType(IntegerType, ArrayType(DoubleType), false), 80400)
checkDefaultSize(structType, 812)

def checkEqualsIgnoreCompatibleNullability(
from: DataType,
to: DataType,
expected: Boolean): Unit = {
val testName =
s"equalsIgnoreCompatibleNullability: (from: ${from}, to: ${to})"
test(testName) {
assert(DataType.equalsIgnoreCompatibleNullability(from, to) === expected)
}
}

checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = true),
to = ArrayType(DoubleType, containsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(DoubleType, containsNull = false),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(DoubleType, containsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = true),
to = ArrayType(DoubleType, containsNull = false),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(StringType, containsNull = false),
expected = false)

checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = true),
to = MapType(StringType, DoubleType, valueContainsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = false),
to = MapType(StringType, DoubleType, valueContainsNull = false),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = false),
to = MapType(StringType, DoubleType, valueContainsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = true),
to = MapType(StringType, DoubleType, valueContainsNull = false),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, ArrayType(IntegerType, true), valueContainsNull = true),
to = MapType(StringType, ArrayType(IntegerType, false), valueContainsNull = true),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, ArrayType(IntegerType, false), valueContainsNull = true),
to = MapType(StringType, ArrayType(IntegerType, true), valueContainsNull = true),
expected = true)


checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = true) :: Nil),
to = StructType(StructField("a", StringType, nullable = true) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = false) :: Nil),
to = StructType(StructField("a", StringType, nullable = false) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = false) :: Nil),
to = StructType(StructField("a", StringType, nullable = true) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = true) :: Nil),
to = StructType(StructField("a", StringType, nullable = false) :: Nil),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = StructType(
StructField("a", StringType, nullable = false) ::
StructField("b", StringType, nullable = true) :: Nil),
to = StructType(
StructField("a", StringType, nullable = false) ::
StructField("b", StringType, nullable = false) :: Nil),
expected = false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}


private[sql] class DefaultSource
Expand Down Expand Up @@ -131,7 +131,7 @@ private[sql] case class JSONRelation(

override def equals(other: Any): Boolean = other match {
case that: JSONRelation =>
(this.path == that.path) && (this.schema == that.schema)
(this.path == that.path) && this.schema.sameType(that.schema)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.logging.Level
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.spark.sql.types.{StructType, DataType}
import parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
Expand Down Expand Up @@ -172,9 +173,13 @@ private[sql] object ParquetRelation {
sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
.name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
// This is a hack. We always set nullable/containsNull/valueContainsNull to true
// for the schema of a parquet data.
val schema = StructType.fromAttributes(attributes).asNullable
val newAttributes = schema.toAttributes
ParquetTypesConverter.writeMetaData(newAttributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
override val output = attributes
override val output = newAttributes
}
}

Expand Down
Loading

0 comments on commit 1259994

Please sign in to comment.