Skip to content

Commit

Permalink
[SPARK-27961][SQL] DataSourceV2Relation should not have refresh method
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The newly added `Refresh` method in PR apache#24401 prevented the work of moving DataSourceV2Relation into catalyst. It calls `case table: FileTable => table.fileIndex.refresh()` while `FileTable` belongs to sql/core.

More importantly, Ryan Blue pointed out DataSourceV2Relation is immutable by design, it should not have refresh method.

## How was this patch tested?

Unit test

Closes apache#24815 from gengliangwang/removeRefreshTable.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
gengliangwang authored and dongjoon-hyun committed Jun 8, 2019
1 parent 354ec25 commit db0f6b4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}

override def refresh(): Unit = table match {
case table: FileTable => table.fileIndex.refresh()
case _ => // Do nothing.
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
"recreating the Dataset/DataFrame involved.")
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: $currentReader", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SharedSQLContext
abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {

/** Removes one data file in the given directory. */
private def deleteOneFileInDirectory(dir: File): Unit = {
protected def deleteOneFileInDirectory(dir: File): Unit = {
assert(dir.isDirectory)
val oneFile = dir.listFiles().find { file =>
!file.getName.startsWith("_") && !file.getName.startsWith(".")
Expand All @@ -38,10 +38,9 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {
oneFile.foreach(_.delete())
}

test("SPARK-16336,SPARK-27504 Suggest doing table refresh " +
"when encountering FileNotFoundException") {
test("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException") {
withTempPath { (location: File) =>
// Create a Parquet directory
// Create an ORC directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.orc(location.getAbsolutePath)

Expand All @@ -57,13 +56,20 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {
df.count()
}
assert(e.getMessage.contains("FileNotFoundException"))
assert(e.getMessage.contains("REFRESH"))
assert(e.getMessage.contains("recreating the Dataset/DataFrame involved"))
}
}
}

class MetadataCacheV1Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")

test("SPARK-16337,SPARK-27504 temporary view refresh") {
test("SPARK-16337 temporary view refresh") {
withTempView("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
// Create an ORC directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.orc(location.getAbsolutePath)

Expand Down Expand Up @@ -113,13 +119,6 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}

class MetadataCacheV1Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
}

class MetadataCacheV2Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf =
super
Expand Down

0 comments on commit db0f6b4

Please sign in to comment.