Skip to content

Commit

Permalink
[SPARK-31047][SQL] Improve file listing for ViewFileSystem
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Use `listLocatedStatus` when `lnMemoryFileIndex` is listing files from a `ViewFileSystem` which should delegate to that of `DistributedFileSystem`.

### Why are the changes needed?
When `ViewFileSystem` is used to manage several `DistributedFileSystem`, the change will improve performance of file listing, especially when there are many files.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes apache#27801 from manuzhang/spark-31047.

Authored-by: manuzhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
manuzhang authored and dongjoon-hyun committed Mar 17, 2020
1 parent 57d27e9 commit 4e4e08f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

Expand Down Expand Up @@ -313,7 +314,7 @@ object InMemoryFileIndex extends Logging {
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case _: DistributedFileSystem if !ignoreLocality =>
case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import java.net.URI

import scala.collection.mutable

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator}
import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}

import org.apache.spark.SparkException
import org.apache.spark.metrics.source.HiveCatalogMetrics
Expand Down Expand Up @@ -465,6 +469,25 @@ class FileIndexSuite extends SharedSparkSession {
}
}
}

test("SPARK-31047 - Improve file listing for ViewFileSystem") {
val path = mock(classOf[Path])
val dfs = mock(classOf[ViewFileSystem])
when(path.getFileSystem(any[Configuration])).thenReturn(dfs)
val statuses =
Seq(
new LocatedFileStatus(
new FileStatus(0, false, 0, 100, 0,
new Path("file")), Array(new BlockLocation()))
)
when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] {
val iter = statuses.toIterator
override def hasNext: Boolean = iter.hasNext
override def next(): LocatedFileStatus = iter.next
})
val fileIndex = new TestInMemoryFileIndex(spark, path)
assert(fileIndex.leafFileStatuses.toSeq == statuses)
}
}

object DeletionRaceFileSystem {
Expand Down

0 comments on commit 4e4e08f

Please sign in to comment.