Skip to content

Commit

Permalink
[SPARK-27504][SQL] File source V2: support refreshing metadata cache
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In file source V1, if some file is deleted manually, reading the DataFrame/Table will throws an exception with suggestion message
```
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
```
After refreshing the table/DataFrame, the reads should return correct results.

We should follow it in file source V2 as well.
## How was this patch tested?
Unit test

Closes apache#24401 from gengliangwang/refreshFileTable.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gengliangwang authored and cloud-fan committed Apr 19, 2019
1 parent 163a6e2 commit 31488e1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}

override def refresh(): Unit = table match {
case table: FileTable => table.fileIndex.refresh()
case _ => // Do nothing.
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
override def next(): Boolean = {
if (currentReader == null) {
if (readers.hasNext) {
if (ignoreMissingFiles || ignoreCorruptFiles) {
try {
currentReader = getNextReader()
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: $currentReader", e)
currentReader = null
return false
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: $currentReader", e)
currentReader = null
InputFileBlockHolder.unset()
return false
}
} else {
try {
currentReader = getNextReader()
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: $currentReader", e)
currentReader = null
return false
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles =>
throw new FileNotFoundException(
e.getMessage + "\n" +
"It is possible the underlying files have been updated. " +
"You can explicitly invalidate the cache in Spark by " +
"running 'REFRESH TABLE tableName' command in SQL or " +
"by recreating the Dataset/DataFrame involved.")
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: $currentReader", e)
currentReader = null
InputFileBlockHolder.unset()
return false
}
} else {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.sql

import java.io.File

import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

/**
* Test suite to handle metadata cache related.
*/
class MetadataCacheSuite extends QueryTest with SharedSQLContext {
abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {

/** Removes one data file in the given directory. */
private def deleteOneFileInDirectory(dir: File): Unit = {
Expand All @@ -38,14 +38,15 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
oneFile.foreach(_.delete())
}

test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") {
test("SPARK-16336,SPARK-27504 Suggest doing table refresh " +
"when encountering FileNotFoundException") {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
.write.orc(location.getAbsolutePath)

// Read the directory in
val df = spark.read.parquet(location.getAbsolutePath)
val df = spark.read.orc(location.getAbsolutePath)
assert(df.count() == 100)

// Delete a file
Expand All @@ -60,14 +61,14 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-16337 temporary view refresh") {
test("SPARK-16337,SPARK-27504 temporary view refresh") {
withTempView("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
.write.orc(location.getAbsolutePath)

// Read the directory in
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)

// Delete a file
Expand All @@ -93,10 +94,10 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
.write.orc(location.getAbsolutePath)

// Read the directory in
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")

// Delete a file
deleteOneFileInDirectory(location)
Expand All @@ -111,3 +112,17 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}
}

class MetadataCacheV1Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
}

class MetadataCacheV2Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
}

0 comments on commit 31488e1

Please sign in to comment.