Skip to content

Commit

Permalink
[SPARK-28084][SQL] Resolving the partition column name based on the r…
Browse files Browse the repository at this point in the history
…esolver in sql load command

### What changes were proposed in this pull request?

LOAD DATA command resolves the partition column name as case sensitive manner,
where as in insert commandthe partition column name will be resolved using
the SQLConf resolver where the names will be resolved based on `spark.sql.caseSensitive` property. Same logic can be applied for resolving the partition column names in LOAD COMMAND.

### Why are the changes needed?

It's to handle the partition column name correctly according to the configuration.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing UT and manual testing.

Closes apache#24903 from sujith71955/master_paritionColName.

Lead-authored-by: s71955 <[email protected]>
Co-authored-by: sujith71955 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
sujith71955 authored and dongjoon-hyun committed Oct 3, 2019
1 parent 40485f4 commit ee66890
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ case class LoadDataCommand(
val catalog = sparkSession.sessionState.catalog
val targetTable = catalog.getTableMetadata(table)
val tableIdentwithDB = targetTable.identifier.quotedString
val normalizedSpec = partition.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
targetTable.partitionColumnNames,
tableIdentwithDB,
sparkSession.sessionState.conf.resolver)
}

if (targetTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB")
Expand All @@ -297,13 +304,6 @@ case class LoadDataCommand(
s"do not match number of partitioned columns in table " +
s"(${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
s"but the specified partition spec refers to a column that is not partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
Expand Down Expand Up @@ -353,7 +353,7 @@ case class LoadDataCommand(
catalog.loadPartition(
targetTable.identifier,
loadPath.toString,
partition.get,
normalizedSpec.get,
isOverwrite,
inheritTableSpecs = true,
isSrcLocal = isLocal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.command.LoadDataCommand
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -289,7 +290,29 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
checkAnswer(
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
sql("SELECT * FROM non_part_table").collect())
sql("SELECT * FROM non_part_table"))
}
}

test("SPARK-28084 case insensitive names of static partitioning in INSERT commands") {
withTable("part_table") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql("CREATE TABLE part_table (price int, qty int) partitioned by (year int, month int)")
sql("INSERT INTO part_table PARTITION(YEar = 2015, month = 1) SELECT 1, 1")
checkAnswer(sql("SELECT * FROM part_table"), Row(1, 1, 2015, 1))
}
}
}

test("SPARK-28084 case insensitive names of dynamic partitioning in INSERT commands") {
withTable("part_table") {
withSQLConf(
SQLConf.CASE_SENSITIVE.key -> "false",
"hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql("CREATE TABLE part_table (price int) partitioned by (year int)")
sql("INSERT INTO part_table PARTITION(YEar) SELECT 1, 2019")
checkAnswer(sql("SELECT * FROM part_table"), Row(1, 2019))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,26 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SPARK-28084 check for case insensitive property of partition column name in load command") {
withTempDir { dir =>
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
Files.append("1", new File(dirPath, "part-r-000011"), StandardCharsets.UTF_8)
withTable("part_table") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
"""
|CREATE TABLE part_table (c STRING)
|PARTITIONED BY (d STRING)
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$path/part-r-000011' " +
"INTO TABLE part_table PARTITION(D ='1')")
checkAnswer(sql("SELECT * FROM part_table"), Seq(Row("1", "1")))
}
}
}
}

test("SPARK-25738: defaultFs can have a port") {
val defaultURI = new URI("hdfs://fizz.buzz.com:8020")
val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam"))
Expand Down

0 comments on commit ee66890

Please sign in to comment.