Skip to content

Commit

Permalink
Revert "[SPARK-23195][SQL] Keep the Hint of Cached Data"
Browse files Browse the repository at this point in the history
This reverts commit 44cc4da.
  • Loading branch information
gatorsmile committed Jan 24, 2018
1 parent f54b65c commit 4e7b490
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case class InMemoryRelation(
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
statsOfPlanToCache: Statistics)
statsOfPlanToCache: Statistics = null)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
Expand All @@ -77,7 +77,7 @@ case class InMemoryRelation(
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
statsOfPlanToCache
} else {
Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
Statistics(sizeInBytes = batchStats.value.longValue)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
}
}

test("broadcast hint is retained in a cached plan") {
Seq(true, false).foreach { materialized =>
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
broadcast(df2).cache()
if (materialized) df2.collect()
val df3 = df1.join(df2, Seq("key"), "inner")
val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
case b: BroadcastHashJoinExec => b
}.size
assert(numBroadCastHashJoin === 1)
}
}
}

private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
val joined = df1.join(df, Seq("key"), "inner")
Expand Down

0 comments on commit 4e7b490

Please sign in to comment.