Skip to content

Commit

Permalink
[SPARK-42480][SQL] Improve the performance of drop partitions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Change to get matching partition names rather than partition objects when drop partitions

### Why are the changes needed?
1. Partition names are enough to drop partitions
2. It can reduce the time overhead and driver memory overhead.

### Does this PR introduce _any_ user-facing change?
Yes, we have add a new sql conf to enable this feature: `spark.sql.hive.dropPartitionByName.enabled`

### How was this patch tested?
Add new tests.

Closes apache#40069 from wecharyu/SPARK-42480.

Authored-by: wecharyu <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
wecharyu authored and sunchao committed Mar 9, 2023
1 parent f95fc19 commit 153ace7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_DROP_PARTITION_BY_NAME =
buildConf("spark.sql.hive.dropPartitionByName.enabled")
.doc("When true, Spark will get partition name rather than partition object " +
"to drop partition, which can improve the performance of drop partition.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
Expand Down Expand Up @@ -4483,6 +4491,8 @@ class SQLConf extends Serializable with Logging {

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

def metastoreDropPartitionsByName: Boolean = getConf(HIVE_METASTORE_DROP_PARTITION_BY_NAME)

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)

def metastorePartitionPruningInSetThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,22 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
}
}
}

test("SPARK-42480: drop partition when dropPartitionByName enabled") {
withSQLConf(SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t(name STRING, age INT) USING PARQUET PARTITIONED BY (region STRING)")
sql(s"ALTER TABLE $t ADD PARTITION (region='=reg1') LOCATION 'loc1'")
checkPartitions(t, Map("region" -> "=reg1"))
sql(s"ALTER TABLE $t PARTITION (region='=reg1') RENAME TO PARTITION (region='=%reg1')")
checkPartitions(t, Map("region" -> "=%reg1"))
sql(s"ALTER TABLE $t DROP PARTITION (region='=%reg1')")
checkPartitions(t)
sql(s"ALTER TABLE $t ADD PARTITION (region='reg?2') LOCATION 'loc2'")
checkPartitions(t, Map("region" -> "reg?2"))
sql(s"ALTER TABLE $t DROP PARTITION (region='reg?2')")
checkPartitions(t)
}
}
}
}
13 changes: 13 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.collection.mutable.HashMap
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
Expand All @@ -49,6 +51,7 @@ import org.apache.spark.util.Utils


private[spark] object HiveUtils extends Logging {
private val PATTERN_FOR_KEY_EQ_VAL = "(.+)=(.+)".r

/** The version of hive used internally by Spark SQL. */
val builtinHiveVersion: String = HiveVersionInfo.getVersion
Expand Down Expand Up @@ -562,4 +565,14 @@ private[spark] object HiveUtils extends Logging {
table.copy(schema = StructType((dataCols ++ partCols).toArray))
}
}

/**
* Extract the partition values from a partition name, e.g., if a partition name is
* "region=US/dt=2023-02-18", then we will return an array of values ("US", "2023-02-18").
*/
def partitionNameToValues(name: String): Array[String] = {
name.split(Path.SEPARATOR).map {
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -679,19 +679,28 @@ private[hive] class HiveClientImpl(
purge: Boolean,
retainData: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
val parts = shim.getPartitions(client, hiveTable, s.asJava)
if (parts.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
val dropPartitionByName = SQLConf.get.metastoreDropPartitionsByName
if (dropPartitionByName) {
val partitionNames = shim.getPartitionNames(client, db, table, s.asJava, -1)
if (partitionNames.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
}
partitionNames.map(HiveUtils.partitionNameToValues(_).toList.asJava)
} else {
val hiveTable = shim.getTable(client, db, table, true /* throw exception */)
val parts = shim.getPartitions(client, hiveTable, s.asJava)
if (parts.isEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(db, table, Seq(s))
}
parts.map(_.getValues)
}
parts.map(_.getValues)
}.distinct
val droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,29 @@ class AlterTableDropPartitionSuite
}
}
}

test("SPARK-42480: hive client calls when dropPartitionByName enabled") {
Seq(false, true).foreach { statsOn =>
withSQLConf(
SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> statsOn.toString,
SQLConf.HIVE_METASTORE_DROP_PARTITION_BY_NAME.key -> "true") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
sql(s"ALTER TABLE $t ADD PARTITION (part=2)") // empty partition
checkHiveClientCalls(expected = if (statsOn) 25 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=2)")
}
checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
}
sql(s"CACHE TABLE $t")
checkHiveClientCalls(expected = if (statsOn) 30 else 17) {
sql(s"ALTER TABLE $t DROP PARTITION (part=1)")
}
}
}
}
}
}

0 comments on commit 153ace7

Please sign in to comment.