Skip to content

Commit

Permalink
[SPARK-20831][SQL] Fix INSERT OVERWRITE data source tables with IF NO…
Browse files Browse the repository at this point in the history
…T EXISTS

### What changes were proposed in this pull request?
Currently, we have a bug when we specify `IF NOT EXISTS` in `INSERT OVERWRITE` data source tables. For example, given a query:
```SQL
INSERT OVERWRITE TABLE $tableName partition (b=2, c=3) IF NOT EXISTS SELECT 9, 10
```
we will get the following error:
```
unresolved operator 'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, Map(b -> Some(2), c -> Some(3)), true, true;;
'InsertIntoTable Relation[a#425,d#426,b#427,c#428] parquet, Map(b -> Some(2), c -> Some(3)), true, true
+- Project [cast(9#423 as int) AS a#429, cast(10#424 as int) AS d#430]
   +- Project [9 AS 9#423, 10 AS 10#424]
      +- OneRowRelation$
```

This PR is to fix the issue to follow the behavior of Hive serde tables
> INSERT OVERWRITE will overwrite any existing data in the table or partition unless IF NOT EXISTS is provided for a partition

### How was this patch tested?
Modified an existing test case

Author: gatorsmile <[email protected]>

Closes apache#18050 from gatorsmile/insertPartitionIfNotExists.
  • Loading branch information
gatorsmile authored and cloud-fan committed May 22, 2017
1 parent 2597674 commit f3ed62a
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
Map.empty, logicalPlan, overwrite, false)
Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false)

def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,17 +410,20 @@ case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) exten
* would have Map('a' -> Some('1'), 'b' -> None).
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean)
ifPartitionNotExists: Boolean)
extends LogicalPlan {
assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
// IF NOT EXISTS is only valid in INSERT OVERWRITE
assert(overwrite || !ifPartitionNotExists)
// IF NOT EXISTS is only valid in static partitions
assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists)

// We don't want `table` in children as sometimes we don't want to transform it.
override def children: Seq[LogicalPlan] = query :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ object SQLConf {
val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
"This includes both datasource and converted Hive tables. When partition managment " +
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ class PlanParserSuite extends PlanTest {
def insert(
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
ifPartitionNotExists: Boolean = false): LogicalPlan =
InsertIntoTable(table("s"), partition, plan, overwrite, ifPartitionNotExists)

// Single inserts
assertEqual(s"insert overwrite table s $sql",
insert(Map.empty, overwrite = true))
assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql",
insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true))
insert(Map("e" -> Option("1")), overwrite = true, ifPartitionNotExists = true))
assertEqual(s"insert into s $sql",
insert(Map.empty))
assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql",
Expand All @@ -193,9 +193,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union(
table("s"), Map.empty, plan.limit(1), false, ifPartitionNotExists = false).union(
InsertIntoTable(
table("u"), Map.empty, plan2, false, ifNotExists = false)))
table("u"), Map.empty, plan2, false, ifPartitionNotExists = false)))
}

test ("insert with if not exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partition = Map.empty[String, Option[String]],
query = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)
ifPartitionNotExists = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ case class DataSource(
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
fileFormat = format,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -195,6 +195,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitions,
i.ifPartitionNotExists,
partitionSchema,
t.bucketSpec,
t.fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ import org.apache.spark.sql.execution.command._
* overwrites: when the spec is empty, all partitions are overwritten.
* When it covers a prefix of the partition keys, only partitions matching
* the prefix are overwritten.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
Expand All @@ -61,8 +64,8 @@ case class InsertIntoHadoopFsRelationCommand(
val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to file.")
throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " +
"cannot save to file.")
}

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
Expand All @@ -76,11 +79,12 @@ case class InsertIntoHadoopFsRelationCommand(

var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty

// When partitions are tracked by the catalog, compute all custom partition locations that
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
val matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
catalogTable.get.identifier, Some(staticPartitions))
initialMatchingPartitions = matchingPartitions.map(_.spec)
customPartitionLocations = getCustomPartitionLocations(
Expand All @@ -101,8 +105,12 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
}
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists)
if DDLUtils.isHiveTable(relation.tableMeta) =>
InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists)
case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)

case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
Expand Down Expand Up @@ -207,11 +207,11 @@ case class RelationConversions(
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)

// Read path
case relation: CatalogRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ case class CreateHiveTableAsSelectCommand(
Map(),
query,
overwrite = false,
ifNotExists = false)).toRdd
ifPartitionNotExists = false)).toRdd
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -78,7 +78,7 @@ case class CreateHiveTableAsSelectCommand(
Map(),
query,
overwrite = true,
ifNotExists = false)).toRdd
ifPartitionNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ import org.apache.spark.SparkException
* }}}.
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoHiveTable(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean) extends RunnableCommand {
ifPartitionNotExists: Boolean) extends RunnableCommand {

override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand Down Expand Up @@ -375,7 +376,7 @@ case class InsertIntoHiveTable(

var doHiveOverwrite = overwrite

if (oldPart.isEmpty || !ifNotExists) {
if (oldPart.isEmpty || !ifPartitionNotExists) {
// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,72 +166,54 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
sql("DROP TABLE tmp_table")
}

test("INSERT OVERWRITE - partition IF NOT EXISTS") {
withTempDir { tmpDir =>
val table = "table_with_partition"
withTable(table) {
val selQuery = s"select c1, p1, p2 from $table"
sql(
s"""
|CREATE TABLE $table(c1 string)
|PARTITIONED by (p1 string,p2 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b')
|SELECT 'blarr'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr", "a", "b"))

sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b')
|SELECT 'blarr2'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr2", "a", "b"))
testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
val selQuery = s"select a, b, c, d from $tableName"
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3)
|SELECT 1, 4
""".stripMargin)
checkAnswer(sql(selQuery), Row(1, 2, 3, 4))

var e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2) IF NOT EXISTS
|SELECT 'blarr3', 'newPartition'
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3)
|SELECT 5, 6
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))

val e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c) IF NOT EXISTS
|SELECT 7, 8, 3
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))

e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2) IF NOT EXISTS
|SELECT 'blarr3', 'b'
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
// If the partition already exists, the insert will overwrite the data
// unless users specify IF NOT EXISTS
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3) IF NOT EXISTS
|SELECT 9, 10
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))

// If the partition already exists, the insert will overwrite the data
// unless users specify IF NOT EXISTS
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b') IF NOT EXISTS
|SELECT 'blarr3'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr2", "a", "b"))
}
}
// ADD PARTITION has the same effect, even if no actual data is inserted.
sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=21, c=31) IF NOT EXISTS
|SELECT 20, 24
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
}

test("Insert ArrayType.containsNull == false") {
Expand Down

0 comments on commit f3ed62a

Please sign in to comment.