Skip to content

Commit

Permalink
[SPARK-33617][SQL][FOLLOWUP] refine the default parallelism SQL config
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is a followup of apache#30559 . The default parallelism config in Spark core is not good, as it's unclear where it applies. To not inherit this problem in Spark SQL, this PR refines the default parallelism SQL config, to make it clear that it only applies to leaf nodes.

### Why are the changes needed?

Make the config clearer.

### Does this PR introduce _any_ user-facing change?

It changes an unreleased config.

### How was this patch tested?

existing tests

Closes apache#30736 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Dec 15, 2020
1 parent 23083aa commit 40c37d6
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val DEFAULT_PARALLELISM = buildConf("spark.sql.default.parallelism")
.doc("The number of parallelism for Spark SQL, the default value is " +
"`spark.default.parallelism`.")
val LEAF_NODE_DEFAULT_PARALLELISM = buildConf("spark.sql.leafNodeDefaultParallelism")
.doc("The default parallelism of Spark SQL leaf nodes that produce data, such as the file " +
"scan node, the local data scan node, the range node, etc. The default value of this " +
"config is 'SparkContext#defaultParallelism'.")
.version("3.2.0")
.intConf
.checkValue(_ > 0, "The value of spark.sql.default.parallelism must be positive.")
.checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.")
.createOptional

val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
Expand Down Expand Up @@ -3202,8 +3203,6 @@ class SQLConf extends Serializable with Logging {

def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)

def defaultParallelism: Option[Int] = getConf(DEFAULT_PARALLELISM)

def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

def numShufflePartitions: Int = {
Expand Down
10 changes: 6 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,7 @@ class SparkSession private(
* @since 2.0.0
*/
def range(start: Long, end: Long): Dataset[java.lang.Long] = {
range(start, end, step = 1,
numPartitions = sqlContext.conf.defaultParallelism.getOrElse(sparkContext.defaultParallelism))
range(start, end, step = 1, numPartitions = leafNodeDefaultParallelism)
}

/**
Expand All @@ -534,8 +533,7 @@ class SparkSession private(
* @since 2.0.0
*/
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step,
numPartitions = sqlContext.conf.defaultParallelism.getOrElse(sparkContext.defaultParallelism))
range(start, end, step, numPartitions = leafNodeDefaultParallelism)
}

/**
Expand Down Expand Up @@ -775,6 +773,10 @@ class SparkSession private(
SparkSession.setActiveSession(old)
}
}

private[sql] def leafNodeDefaultParallelism: Int = {
conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse(sparkContext.defaultParallelism)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ case class LocalTableScanExec(
sqlContext.sparkContext.emptyRDD
} else {
val numSlices = math.min(
unsafeRows.length,
conf.defaultParallelism.getOrElse(sqlContext.sparkContext.defaultParallelism))
unsafeRows.length, sqlContext.sparkSession.leafNodeDefaultParallelism)
sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
.orElse(conf.defaultParallelism).getOrElse(session.sparkContext.defaultParallelism)
.getOrElse(session.sparkContext.defaultParallelism)
val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
validMetrics.toArray,
advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val start: Long = range.start
val end: Long = range.end
val step: Long = range.step
val numSlices: Int = range.numSlices.orElse(sqlContext.conf.defaultParallelism)
.getOrElse(sparkContext.defaultParallelism)
val numSlices: Int = range.numSlices.getOrElse(sqlContext.sparkSession.leafNodeDefaultParallelism)
val numElements: BigInt = range.numElements
val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,7 @@ case class AlterTableRecoverPartitionsCommand(
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(serializedPaths.length,
Math.min(spark.sessionState.conf.defaultParallelism
.getOrElse(spark.sparkContext.defaultParallelism), 10000))
Math.min(spark.sparkContext.defaultParallelism, 10000))
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
// files for all the new partitions in sequential way, which is super slow.
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ object FilePartition extends Logging {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.orElse(sparkSession.sessionState.conf.defaultParallelism)
.getOrElse(sparkSession.sparkContext.defaultParallelism)
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ object SchemaMergeUtils extends Logging {
// Set the number of partitions to prevent following schema reads from generating many tasks
// in case of a small number of orc files.
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sessionState.conf.defaultParallelism
.getOrElse(sparkSession.sparkContext.defaultParallelism))
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
}

test("SPARK-33617: spark.sql.default.parallelism effective for LocalTableScan") {
test("SPARK-33617: change default parallelism of LocalTableScan") {
Seq(1, 4).foreach { minPartitionNum =>
withSQLConf(SQLConf.DEFAULT_PARALLELISM.key -> minPartitionNum.toString) {
withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> minPartitionNum.toString) {
val df = spark.sql("SELECT * FROM VALUES (1), (2), (3), (4), (5), (6), (7), (8)")
assert(df.rdd.partitions.length === minPartitionNum)
}
Expand Down

0 comments on commit 40c37d6

Please sign in to comment.