Skip to content

Commit

Permalink
[SPARK-43380][SQL] Revert Fix Avro data type conversion issues
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Revert my last PR apache#41052 that causes AVRO read performance regression since I change the code structure.

### Why are the changes needed?
Remove performance regression

### How was this patch tested?
Unit test

Closes apache#42458 from zeruibao/revert-avro-change.

Authored-by: zeruibao <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
zeruibao authored and gengliangwang committed Aug 15, 2023
1 parent 7e52169 commit 46580ab
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 495 deletions.
10 changes: 0 additions & 10 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,6 @@
}
}
},
"AVRO_INCORRECT_TYPE" : {
"message" : [
"Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: <key>."
]
},
"AVRO_LOWER_PRECISION" : {
"message" : [
"Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: <key>."
]
},
"BATCH_METADATA_NOT_FOUND" : {
"message" : [
"Unable to find batch <batchMetadataFile>."
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException}
import org.apache.spark.TestUtils.assertExceptionMsg
Expand Down Expand Up @@ -815,166 +814,6 @@ abstract class AvroSuite
}
}

test("SPARK-43380: Fix Avro data type conversion" +
" of decimal type to avoid producing incorrect results") {
withTempPath { path =>
val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
// With the flag disabled, we will throw an exception if there is a mismatch
withSQLConf(confKey -> "false") {
val e = intercept[SparkException] {
spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect()
}
ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_LOWER_PRECISION",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(4,3\\)\"",
"key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
}
// The following used to work, so it should still work with the flag enabled
checkAnswer(
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.123"))
)
withSQLConf(confKey -> "true") {
// With the flag enabled, we return a null silently, which isn't great
checkAnswer(
spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString),
Row(null)
)
checkAnswer(
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.123"))
)
}
}
}

test("SPARK-43380: Fix Avro data type conversion" +
" of DayTimeIntervalType to avoid producing incorrect results") {
withTempPath { path =>
val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false)))
val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1)))

val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.format("avro").save(path.getCanonicalPath)

withSQLConf(confKey -> "false") {
Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
val e = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}

ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_INCORRECT_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval day to second",
"sqlType" -> s""""$sqlType"""",
"key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
}
}

withSQLConf(confKey -> "true") {
// Allow conversion and do not need to check result
spark.read.schema("a Date").format("avro").load(path.toString)
spark.read.schema("a timestamp").format("avro").load(path.toString)
spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
}
}
}

test("SPARK-43380: Fix Avro data type conversion" +
" of YearMonthIntervalType to avoid producing incorrect results") {
withTempPath { path =>
val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false)))
val data = Seq(Row(java.time.Period.of(1, 1, 0)))

val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.format("avro").save(path.getCanonicalPath)

withSQLConf(confKey -> "false") {
Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType =>
val e = intercept[SparkException] {
spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect()
}

ExceptionUtils.getRootCause(e) match {
case ex: AnalysisException =>
checkError(
exception = ex,
errorClass = "AVRO_INCORRECT_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "interval year to month",
"sqlType" -> s""""$sqlType"""",
"key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key),
matchPVals = true
)
case other =>
fail(s"Received unexpected exception", other)
}
}
}

withSQLConf(confKey -> "true") {
// Allow conversion and do not need to check result
spark.read.schema("a Date").format("avro").load(path.toString)
spark.read.schema("a timestamp").format("avro").load(path.toString)
spark.read.schema("a timestamp_ntz").format("avro").load(path.toString)
}
}
}

Seq(
"time-millis",
"time-micros",
"timestamp-micros",
"timestamp-millis",
"local-timestamp-millis",
"local-timestamp-micros"
).foreach { timeLogicalType =>
test(s"converting $timeLogicalType type to long in avro") {
withTempPath { path =>
val df = Seq(100L)
.toDF("dt")
val avroSchema =
s"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
| {"name": "dt", "type": {"type": "long", "logicalType": "$timeLogicalType"}}
| ]
|}""".stripMargin
df.write.format("avro").option("avroSchema", avroSchema).save(path.getCanonicalPath)
checkAnswer(
spark.read.schema(s"dt long").format("avro").load(path.toString),
Row(100L))
}
}
}

test("converting some specific sparkSQL types to avro") {
withTempPath { tempDir =>
val testSchema = StructType(Seq(
Expand Down
14 changes: 0 additions & 14 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,6 @@ Invalid as-of join.

For more details see [AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html)

### AVRO_INCORRECT_TYPE

SQLSTATE: none assigned

Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original encoded data type is `<avroType>`, however you're trying to read the field as `<sqlType>`, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: `<key>`.

### AVRO_LOWER_PRECISION

SQLSTATE: none assigned

Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original encoded data type is `<avroType>`, however you're trying to read the field as `<sqlType>`, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: `<key>`.

### BATCH_METADATA_NOT_FOUND

[SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -2186,5 +2174,3 @@ The operation `<operation>` requires a `<requiredType>`. But `<objectName>` is a
The `<functionName>` requires `<expectedNum>` parameters but the actual number is `<actualNum>`.

For more details see [WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html)


1 change: 0 additions & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ license: |

- Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by default. These options include: `pushDownAggregate`, `pushDownLimit`, `pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, please set them to `false`. e.g. set `spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`.
- Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`.
- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
- Since Spark 3.5, Row's json and prettyJson methods are moved to `ToJsonUtil`.
- Since Spark 3.5, the `plan` field is moved from `AnalysisException` to `EnhancedAnalysisException`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3720,36 +3720,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def avroIncorrectTypeError(
avroPath: String, sqlPath: String, avroType: String,
sqlType: String, key: String): Throwable = {
new AnalysisException(
errorClass = "AVRO_INCORRECT_TYPE",
messageParameters = Map(
"avroPath" -> avroPath,
"sqlPath" -> sqlPath,
"avroType" -> avroType,
"sqlType" -> toSQLType(sqlType),
"key" -> key
)
)
}

def avroLowerPrecisionError(
avroPath: String, sqlPath: String, avroType: String,
sqlType: String, key: String): Throwable = {
new AnalysisException(
errorClass = "AVRO_LOWER_PRECISION",
messageParameters = Map(
"avroPath" -> avroPath,
"sqlPath" -> sqlPath,
"avroType" -> avroType,
"sqlType" -> toSQLType(sqlType),
"key" -> key
)
)
}

def optionMustBeLiteralString(key: String): Throwable = {
new AnalysisException(
errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4288,18 +4288,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA =
buildConf("spark.sql.legacy.avro.allowIncompatibleSchema")
.internal()
.doc("When set to false, if types in Avro are encoded in the same format, but " +
"the type in the Avro schema explicitly says that the data types are different, " +
"reject reading the data type in the format to avoid returning incorrect results. " +
"When set to true, it restores the legacy behavior of allow reading the data in the" +
" format, which may return incorrect results.")
.version("3.5.0")
.booleanConf
.createWithDefault(false)

val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
.internal()
Expand Down

0 comments on commit 46580ab

Please sign in to comment.