Skip to content

Commit

Permalink
[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect()…
Browse files Browse the repository at this point in the history
… after cache()

When df.cache() method called, the `withCachedData` of `QueryExecution` has been created, which mean it will not look up the cached tables when action method called afterward.

Author: Cheng Hao <[email protected]>

Closes apache#5714 from chenghao-intel/SPARK-7158 and squashes the following commits:

58ea8aa [Cheng Hao] style issue
2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager
a5647d9 [Cheng Hao] hide the queryExecution of DataFrame
fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for cache/persist/unpersist
  • Loading branch information
chenghao-intel authored and marmbrus committed Jun 12, 2015
1 parent 337c16d commit 767cc94
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
sqlContext.conf.useCompression,
sqlContext.conf.columnBatchSize,
storageLevel,
query.queryExecution.executedPlan,
sqlContext.executePlan(query.logicalPlan).executedPlan,
tableName))
}
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
)
}

test("SPARK-7158 collect and take return different results") {
import java.util.UUID
import org.apache.spark.sql.types._

val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
// we except the id is materialized once
def id: () => String = () => { UUID.randomUUID().toString() }

val dfWithId = df.withColumn("id", callUDF(id, StringType))
// Make a new DataFrame (actually the same reference to the old one)
val cached = dfWithId.cache()
// Trigger the cache
val d0 = dfWithId.collect()
val d1 = cached.collect()
val d2 = cached.collect()

// Since the ID is only materialized once, then all of the records
// should come from the cache, not by re-computing. Otherwise, the ID
// will be different
assert(d0.map(_(0)) === d2.map(_(0)))
assert(d0.map(_(1)) === d2.map(_(1)))

assert(d1.map(_(0)) === d2.map(_(0)))
assert(d1.map(_(1)) === d2.map(_(1)))
}

test("grouping on nested fields") {
sqlContext.read.json(sqlContext.sparkContext.parallelize(
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
Expand Down

0 comments on commit 767cc94

Please sign in to comment.