Skip to content

Commit

Permalink
Revert "[SPARK-38354][SQL] Add hash probes metric for shuffled hash j…
Browse files Browse the repository at this point in the history
…oin"

This reverts commit 1584366, as the original PR caused performance regression reported in apache#35686 (comment) .

Closes apache#36338 from c21/revert-metrics.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
c21 authored and dongjoon-hyun committed Apr 26, 2022
1 parent 97449d2 commit 6b5a1f9
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ public long getPeakMemoryUsedBytes() {
/**
* Returns the average number of probes per key lookup.
*/
public double getAvgHashProbesPerKey() {
public double getAvgHashProbeBucketListIterations() {
return (1.0 * numProbes) / numKeyLookups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ public void free() {
}

/**
* Gets the average number of hash probes per key lookup in the underlying `BytesToBytesMap`.
* Gets the average bucket list iterations per lookup in the underlying `BytesToBytesMap`.
*/
public double getAvgHashProbesPerKey() {
return map.getAvgHashProbesPerKey();
public double getAvgHashProbeBucketListIterations() {
return map.getAvgHashProbeBucketListIterations();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class HashAggregateExec(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
"avgHashProbe" ->
SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"),
SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of sort fallback tasks"))

// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
Expand Down Expand Up @@ -207,7 +207,7 @@ case class HashAggregateExec(
metrics.incPeakExecutionMemory(maxMemory)

// Update average hashmap probe
avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)

if (sorter == null) {
// not spilled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ class TungstenAggregationIterator(
metrics.incPeakExecutionMemory(maxMemory)

// Updating average hashmap probe
avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
})

///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation {
*/
def keys(): Iterator[InternalRow]

/**
* Returns the average number of hash probes per key lookup.
*/
def getAvgHashProbesPerKey(): Double

/**
* Returns a read-only copy of this, to be safely used in current thread.
*/
Expand Down Expand Up @@ -226,8 +221,6 @@ private[joins] class UnsafeHashedRelation(

override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption

override def getAvgHashProbesPerKey(): Double = binaryMap.getAvgHashProbesPerKey

// re-used in get()/getValue()/getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex()
var resultRow = new UnsafeRow(numFields)

Expand Down Expand Up @@ -575,12 +568,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
// The number of unique keys.
private var numKeys = 0L

// The number of hash probes for keys.
private var numProbes = 0L

// The number of keys lookups.
private var numKeyLookups = 0L

// needed by serializer
def this() = {
this(
Expand Down Expand Up @@ -629,11 +616,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def getTotalMemoryConsumption: Long = array.length * 8L + page.length * 8L

/**
* Returns the average number of hash probes per key lookup.
*/
def getAvgHashProbesPerKey: Double = (1.0 * numProbes) / numKeyLookups

/**
* Returns the first slot of array that store the keys (sparse mode).
*/
Expand Down Expand Up @@ -668,24 +650,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
* Returns the single UnsafeRow for given key, or null if not found.
*/
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
numKeyLookups += 1
if (isDense) {
numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
return getRow(value, resultRow)
}
}
} else {
numProbes += 1
var pos = firstSlot(key)
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return getRow(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
numProbes += 1
}
}
null
Expand All @@ -712,24 +690,20 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
* Returns an iterator for all the values for the given key, or null if no value found.
*/
def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
numKeyLookups += 1
if (isDense) {
numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
return valueIter(value, resultRow)
}
}
} else {
numProbes += 1
var pos = firstSlot(key)
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return valueIter(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
numProbes += 1
}
}
null
Expand Down Expand Up @@ -808,13 +782,10 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
* Update the address in array for given key.
*/
private def updateIndex(key: Long, address: Long): Unit = {
numKeyLookups += 1
numProbes += 1
var pos = firstSlot(key)
assert(numKeys < array.length / 2)
while (array(pos) != key && array(pos + 1) != 0) {
pos = nextSlot(pos)
numProbes += 1
}
if (array(pos + 1) == 0) {
// this is the first value for this key, put the address in array.
Expand Down Expand Up @@ -1017,8 +988,6 @@ class LongHashedRelation(

override def estimatedSize: Long = map.getTotalMemoryConsumption

override def getAvgHashProbesPerKey(): Double = map.getAvgHashProbesPerKey

override def get(key: InternalRow): Iterator[InternalRow] = {
if (key.isNullAt(0)) {
null
Expand Down Expand Up @@ -1136,8 +1105,6 @@ case object EmptyHashedRelation extends HashedRelation {
override def close(): Unit = {}

override def estimatedSize: Long = 0

override def getAvgHashProbesPerKey(): Double = 0
}

/**
Expand All @@ -1164,8 +1131,6 @@ case object HashedRelationWithAllNullKeys extends HashedRelation {
override def close(): Unit = {}

override def estimatedSize: Long = 0

override def getAvgHashProbesPerKey(): Double = 0
}

/** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ case class ShuffledHashJoinExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"))
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))

override def output: Seq[Attribute] = super[ShuffledJoin].output

Expand Down Expand Up @@ -78,7 +77,6 @@ case class ShuffledHashJoinExec(
def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
val buildTime = longMetric("buildTime")
val avgHashProbe = longMetric("avgHashProbe")
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(
Expand All @@ -91,11 +89,7 @@ case class ShuffledHashJoinExec(
buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener[Unit](_ => {
// Update average hashmap probe
avgHashProbe.set(relation.getAvgHashProbesPerKey())
relation.close()
})
context.addTaskCompletionListener[Unit](_ => relation.close())
relation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
"avg hash probes per key" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 1L,
"avg hash probes per key" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
val shuffleExpected1 = Map(
Expand All @@ -131,11 +131,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df2 = testData2.groupBy($"a").count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
"avg hash probes per key" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 3L,
"avg hash probes per key" ->
"avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))

Expand Down Expand Up @@ -184,7 +184,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
val probes = metrics(nodeId)._2("avg hash probes per key").toString
val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString
if (!probes.contains("\n")) {
// It's a single metrics value
assert(probes.toDouble > 1.0)
Expand Down Expand Up @@ -372,8 +372,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L,
"avg hash probes per key" -> aggregateMetricsPattern))),
"number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
Expand Down Expand Up @@ -402,8 +401,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 1, Map(
nodeId -> (("ShuffledHashJoin", Map(
"number of output rows" -> rows,
"avg hash probes per key" -> aggregateMetricsPattern)))),
"number of output rows" -> rows)))),
enableWholeStage
)
}
Expand Down

0 comments on commit 6b5a1f9

Please sign in to comment.