Skip to content

Commit

Permalink
[SPARK-46934][SQL][FOLLOWUP] Read/write roundtrip for struct type wit…
Browse files Browse the repository at this point in the history
…h special characters with HMS - a backward compatible approach

### What changes were proposed in this pull request?

A backward-compatible approach for apache#45039 to make older versions of spark properly read struct-typed columns created by spark 4.x or later with special characters.

Compared with apache#45039, only the datasource tables are supported now, as we have a special way to store hive incompatible schema to the table properties. This is a safe removal because we don't have any release to support that.

### Why are the changes needed?

backward-compatibility improvement

### Does this PR introduce _any_ user-facing change?
Users can store/read struct-typed columns with special characters.

### How was this patch tested?

#### tests provided by SPARK-22431
```scala
DDLSuite.scala:  test("SPARK-22431: table with nested type col with special char")
DDLSuite.scala:  test("SPARK-22431: view with nested type")
HiveDDLSuite.scala:  test("SPARK-22431: table with nested type") {
HiveDDLSuite.scala:  test("SPARK-22431: view with nested type") {
HiveDDLSuite.scala:  test("SPARK-22431: alter table tests with nested types") {
```
#### tests provided by the previous PR towards SPARK-46934
```scala
HiveMetastoreCatalogSuite.scala:  test("SPARK-46934: HMS columns cannot handle quoted columns")
HiveMetastoreCatalogSuite.scala:  test("SPARK-46934: Handle special characters in struct types") {
HiveMetastoreCatalogSuite.scala:  test("SPARK-46934: Handle special characters in struct types with CTAS") {
HiveMetastoreCatalogSuite.scala:  test("SPARK-46934: Handle special characters in struct types with hive DDL") {
HiveDDLSuite.scala:  test("SPARK-46934: quote element name before parsing struct") {
HiveDDLSuite.scala:  test("SPARK-46934: alter table tests with nested types") {
```

#### manually backward compatibility test

1. create a tarball with the current revison
2. cd dist
3. using spark-sql to mock data
```
spark-sql (default)> CREATE TABLE t AS SELECT named_struct('a.b.b', array('a'), 'a b c', map(1, 'a')) AS `a.b`;
```
4. copy metadata to 3.5.3 release

```
cp -r ~/spark/dist/metastore_db .
```
5. Fix derby version restrictions

```
rm jars/derby-10.14.2.0.jar
cp -r ~/spark/dist/jars/derby-10.16.1.1.jar ./jars
```
6. read data
```
spark-sql (default)> select version();
6.5.3 32232e9
Time taken: 0.103 seconds, Fetched 1 row(s)
spark-sql (default)> select * from t;
{"a.b.b":["a"],"a b c":{1:"a"}}
Time taken: 0.09 seconds, Fetched 1 row(s)
spark-sql (default)> desc formatted t;
a.b                 	struct<a.b.b:array<string>,a b c:map<int,string>>

# Detailed Table Information
Catalog             	spark_catalog
Database            	default
Table               	t
Owner               	hzyaoqin
Created Time        	Wed Nov 27 17:40:53 CST 2024
Last Access         	UNKNOWN
Created By          	Spark 4.0.0-SNAPSHOT
Type                	MANAGED
Provider            	parquet
Statistics          	1245 bytes
Location            	file:/Users/hzyaoqin/spark/dist/spark-warehouse/t
Serde Library       	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat         	org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat        	org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Time taken: 0.054 seconds, Fetched 17 row(s)
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48986 from yaooqinn/SPARK-46934-F.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Dec 11, 2024
1 parent 2f5728f commit d268e0c
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)

val QUOTE_HIVE_STRUCT_FIELD_NAME =
buildConf("spark.sql.hive.quoteHiveStructFieldName")
.doc("When true, for a column defined in struct type, when it contains special characters " +
"in the field name, Spark will quote it for verification. E.g. struct<x:int,y.z:int>" +
" is read as struct<`x`:int,`y.z`:int> for verification.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

/**
* The version of the hive client that will be used to communicate with the metastore. Note that
* this does not necessarily need to be the same version of Hive that is used internally by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, QuotingUtils, StringConcat}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.hive.HiveUtils.QUOTE_HIVE_STRUCT_FIELD_NAME
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -580,7 +581,6 @@ private[hive] class HiveClientImpl(
}

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
verifyColumnDataType(table.dataSchema)
shim.createTable(client, toHiveTable(table, Some(userName)), ignoreIfExists)
}

Expand All @@ -600,7 +600,6 @@ private[hive] class HiveClientImpl(
// these properties are still available to the others that share the same Hive metastore.
// If users explicitly alter these Hive-specific properties through ALTER TABLE DDL, we respect
// these user-specified values.
verifyColumnDataType(table.dataSchema)
val hiveTable = toHiveTable(
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
Expand All @@ -624,7 +623,6 @@ private[hive] class HiveClientImpl(
newDataSchema: StructType,
schemaProps: Map[String, String]): Unit = withHiveState {
val oldTable = shim.getTable(client, dbName, tableName)
verifyColumnDataType(newDataSchema)
val hiveCols = newDataSchema.map(toHiveColumn)
oldTable.setFields(hiveCols.asJava)

Expand Down Expand Up @@ -1092,13 +1090,45 @@ private[hive] object HiveClientImpl extends Logging {
// When reading data in parquet, orc, or avro file format with string type for char,
// the tailing spaces may lost if we are not going to pad it.
val typeString = if (SQLConf.get.charVarcharAsString) {
c.dataType.catalogString
catalogString(c.dataType)
} else {
CharVarcharUtils.getRawTypeString(c.metadata).getOrElse(c.dataType.catalogString)
CharVarcharUtils.getRawTypeString(c.metadata).getOrElse(catalogString(c.dataType))
}
new FieldSchema(c.name, typeString, c.getComment().orNull)
}

/**
* This a a variant of `DataType.catalogString` that does the same thing in general but
* it will not quote the field names in the struct type. HMS API uses unquoted field names
* to store the schema of a struct type. This is fine if we in the write path, we might encounter
* issues in the read path to parse the unquoted schema strings in the Spark SQL parser. You can
* see the tricks we play in the `getSparkSQLDataType` method to handle this. To avoid the
* flakiness of those tricks, we quote the field names, make them unrecognized by HMS API, and
* then store them in custom spark properties in a fallback way.
*
* And the reason we don't add quoting in `DataType.catalogString` directly is that we don't
* want to break the compatibility of the existing query output schema.
*/
def catalogString(dataType: DataType): String = dataType match {
case ArrayType(et, _) => s"array<${catalogString(et)}>"
case MapType(k, v, _) => s"map<${catalogString(k)},${catalogString(v)}>"
case StructType(fields) =>
val stringConcat = new StringConcat()
val len = fields.length
stringConcat.append("struct<")
var i = 0
while (i < len) {
val name = QuotingUtils.quoteIfNeeded(fields(i).name)
stringConcat.append(s"$name:${catalogString(fields(i).dataType)}")
i += 1
if (i < len) stringConcat.append(",")
}
stringConcat.append(">")
stringConcat.toString
case udt: UserDefinedType[_] => catalogString(udt.sqlType)
case _ => dataType.catalogString
}

/** Get the Spark SQL native DataType from Hive's FieldSchema. */
private def getSparkSQLDataType(hc: FieldSchema): DataType = {
// For struct types, Hive metastore API uses unquoted element names, so does the spark catalyst
Expand All @@ -1111,7 +1141,12 @@ private[hive] object HiveClientImpl extends Logging {
// struct<x:int,y.z:int> -> struct<`x`:int,`y.z`:int>
// array<struct<x:int,y.z:int>> -> array<struct<`x`:int,`y.z`:int>>
// map<string,struct<x:int,y.z:int>> -> map<string,struct<`x`:int,`y.z`:int>>
val typeStr = hc.getType.replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`")
val typeStr = if (SQLConf.get.getConf(QUOTE_HIVE_STRUCT_FIELD_NAME) &&
hc.getType.indexOf('`') < 0) { // This a defensive code for possible changes in HMS
hc.getType.replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`")
} else {
hc.getType
}
try {
CatalystSqlParser.parseDataType(typeStr)
} catch {
Expand All @@ -1130,10 +1165,6 @@ private[hive] object HiveClientImpl extends Logging {
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

private def verifyColumnDataType(schema: StructType): Unit = {
schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
}

private def toInputFormat(name: String) =
Utils.classForName[org.apache.hadoop.mapred.InputFormat[_, _]](name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.hive

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.hive.HiveUtils.QUOTE_HIVE_STRUCT_FIELD_NAME
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
Expand Down Expand Up @@ -129,6 +131,26 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils {
assert(schema == expectedSchema)
}
}

test("SPARK-46934: HMS columns cannot handle quoted columns") {
withTable("t") {
val schema =
"a struct<" +
"`a.a`:int," +
"`a.b`:struct<" +
" `a b b`:array<string>," +
" `a b c`:map<int, string>" +
" >" +
">"
val e = intercept[AnalysisException](sql("CREATE TABLE t(" + schema + ") USING hive"))
checkError(
exception = e,
condition = "_LEGACY_ERROR_TEMP_3065",
parameters = Map(
"clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
"msg" -> e.getCause.getMessage))
}
}
}

class DataSourceWithHiveMetastoreCatalogSuite
Expand Down Expand Up @@ -441,7 +463,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
}

test("SPARK-46934: Handle special characters in struct types with hive DDL") {
withTable("t") {
try {
val schema =
"a struct<" +
"`a.a`:int," +
Expand All @@ -451,7 +473,22 @@ class DataSourceWithHiveMetastoreCatalogSuite
" >" +
">"
sparkSession.metadataHive.runSqlHive(s"CREATE TABLE t($schema)")
assert(spark.table("t").schema === CatalystSqlParser.parseTableSchema(schema))
withSQLConf(QUOTE_HIVE_STRUCT_FIELD_NAME.key -> "true") {
assert(spark.table("t").schema === CatalystSqlParser.parseTableSchema(schema))
}

withSQLConf(QUOTE_HIVE_STRUCT_FIELD_NAME.key -> "false") {
checkError(exception =
intercept[SparkException](spark.table("t")).getCause.asInstanceOf[SparkException],
condition = "CANNOT_RECOGNIZE_HIVE_TYPE",
parameters = Map(
"fieldType" ->
"\"STRUCT<A.A:INT,A.B:STRUCT<A.B.B:ARRAY<STRING>,A B C:MAP<INT,STRING>>>\"",
"fieldName" -> "`a`"
))
}
} finally {
sparkSession.metadataHive.runSqlHive("DROP TABLE IF EXISTS t")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
Expand Down Expand Up @@ -167,17 +166,23 @@ class HiveDDLSuite
}

test("SPARK-46934: quote element name before parsing struct") {
withTable("t") {
sql("CREATE TABLE t USING hive AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
assert(spark.table("t").schema === CatalystSqlParser.parseTableSchema(
"q STRUCT<`$a`: STRING, b: INT>"))
}
val e = intercept[AnalysisException](
sql("CREATE TABLE t USING hive AS SELECT STRUCT('a' AS `$a`, 1 AS b) q"))
checkError(
exception = e,
condition = "_LEGACY_ERROR_TEMP_3065",
parameters = Map(
"clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
"msg" -> e.getCause.getMessage))

withTable("t") {
sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING hive")
assert(spark.table("t").schema === CatalystSqlParser.parseTableSchema(
"q STRUCT<`$a`:INT, col2:STRING>, i1 INT"))
}
val e1 = intercept[AnalysisException](
sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) USING hive"))
checkError(
exception = e1,
condition = "_LEGACY_ERROR_TEMP_3065",
parameters = Map(
"clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
"msg" -> e1.getCause.getMessage))

withView("v") {
spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
Expand Down Expand Up @@ -233,15 +238,29 @@ class HiveDDLSuite
}
}

test("SPARK-46934: alter table tests with nested types") {
test("SPARK-46934: alter datasource table tests with nested types") {
withTable("t1") {
sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT) USING hive")
sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT) USING parquet")
sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)")
assert(spark.table("t1").schema == CatalystSqlParser.parseTableSchema(
"q STRUCT<col1:INT, col2:STRING>, i1 INT,newcol1 STRUCT<`$col1`:STRING, col2:Int>"))
}
}

test("SPARK-46934: alter hive table tests with nested types") {
withTable("t1") {
sql("CREATE TABLE t1 (q STRUCT<col1:INT, col2:STRING>, i1 INT) USING hive")
val e = intercept[AnalysisException](
sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`$col1`:STRING, col2:Int>)"))
checkError(
exception = e,
condition = "_LEGACY_ERROR_TEMP_3065",
parameters = Map(
"clazz" -> "java.lang.IllegalArgumentException",
"msg" -> e.getCause.getMessage))
}
}

test("SPARK-26630: table with old input format and without partitioned will use HadoopRDD") {
withTable("table_old", "table_ctas_old") {
sql(
Expand Down Expand Up @@ -2849,38 +2868,18 @@ class HiveDDLSuite
}

test("SPARK-47101 checks if nested column names do not include invalid characters") {
// delimiter characters
Seq(",", ":").foreach { c =>
Seq(",", ":", ";", "^", "\\", "/", "%").foreach { c =>
val typ = s"array<struct<`abc${c}xyz`:int>>"
// The regex is from HiveClientImpl.getSparkSQLDataType, please keep them in sync.
val replaced = typ.replaceAll("`", "").replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`")
withTable("t") {
checkError(
exception = intercept[SparkException] {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE t (a $typ) USING hive")
},
condition = "CANNOT_RECOGNIZE_HIVE_TYPE",
parameters = Map(
"fieldType" -> toSQLType(replaced),
"fieldName" -> "`a`")
)
}
}
// other special characters
Seq(";", "^", "\\", "/", "%").foreach { c =>
val typ = s"array<struct<`abc${c}xyz`:int>>"
val replaced = typ.replaceAll("`", "")
val msg = s"java.lang.IllegalArgumentException: Error: : expected at the position " +
s"16 of '$replaced' but '$c' is found."
withTable("t") {
}
checkError(
exception = intercept[AnalysisException] {
sql(s"CREATE TABLE t (a $typ) USING hive")
},
exception = e,
condition = "_LEGACY_ERROR_TEMP_3065",
parameters = Map(
"clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
"msg" -> msg)
"clazz" -> e.getCause.getClass.getName,
"msg" -> e.getCause.getMessage)
)
}
}
Expand Down

0 comments on commit d268e0c

Please sign in to comment.