Skip to content

Commit

Permalink
ARROW-9854: [R] Support reading/writing data to/from S3
Browse files Browse the repository at this point in the history
- [x] read_parquet/feather/etc. from S3 (use FileSystem->OpenInputFile(path))
- [x] write_$FORMAT via FileSystem->OpenOutputStream(path)
- [x] write_dataset (done? at least via URI)
- [x] ~~for linux, an argument to install_arrow to help, assuming you've installed aws-sdk-cpp already (turn on ARROW_S3, AWSSDK_SOURCE=SYSTEM)~~ Turns out there's no official deb/rpm packages for aws-sdk-cpp so there's no value in making this part easier; would be more confusing than helpful actually
- [x] set up a real test bucket and user for e2e testing (credentials available on request)
- [x] add a few tests that use s3, if credentials are set (which I'll set locally)
- [x] add vignette showing how to use s3 (via URI)
- [x] update docs, news

Out of the current scope:

- [ ] testing with minio on CI
- [ ] download dataset, i.e. copy files/directory recursively (needs ARROW-9867, ARROW-9868)
- [ ] friendlier methods for interacting with/viewing a filesystem (ls, mkdir, etc.) (ARROW-9870)
- [ ] direct construction of S3FileSystem object with S3Options (i.e. not only URI) (ARROW-9869)

Closes apache#8058 from nealrichardson/r-s3

Authored-by: Neal Richardson <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
  • Loading branch information
nealrichardson committed Sep 10, 2020
1 parent 986eab4 commit b77e8ae
Show file tree
Hide file tree
Showing 20 changed files with 191 additions and 52 deletions.
9 changes: 5 additions & 4 deletions r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
* Datasets now have `head()`, `tail()`, and take (`[`) methods. `head()` is optimized but the others may not be performant.
* `collect()` gains an `as_data_frame` argument, default `TRUE` but when `FALSE` allows you to evaluate the accumulated `select` and `filter` query but keep the result in Arrow, not an R `data.frame`

## AWS S3 support

* S3 support is now enabled in binary macOS and Windows (Rtools40 only, i.e. R >= 4.0) packages. To enable it on Linux, you will need to build and install `aws-sdk-cpp` from source, then set the environment variable `EXTRA_CMAKE_FLAGS="-DARROW_S3=ON -DAWSSDK_SOURCE=SYSTEM"` prior to building the R package (with bundled C++ build, not with Arrow system libraries) from source.
* File readers and writers (`read_parquet()`, `write_feather()`, et al.) now accept an `s3://` URI as the source or destination file, as do `open_dataset()` and `write_dataset()`. See `vignette("fs", package = "arrow")` for details.

## Computation

* Comparison (`==`, `>`, etc.) and boolean (`&`, `|`, `!`) operations, along with `is.na`, `%in%` and `match` (called `match_arrow()`), on Arrow Arrays and ChunkedArrays are now implemented in the C++ library.
* Aggregation methods `min()`, `max()`, and `unique()` are implemented for Arrays and ChunkedArrays.
* `dplyr` filter expressions on Arrow Tables and RecordBatches are now evaluated in the C++ library, rather than by pulling data into R and evaluating. This yields significant performance improvements.
* `dim()` (`nrow`) for dplyr queries on Table/RecordBatch is now supported

## Packaging

* S3 support is now enabled in binary macOS and Windows (Rtools40 only, i.e. R >= 4.0) packages

## Other improvements

* `arrow` now depends on [`cpp11`](https://cpp11.r-lib.org/), which brings more robust UTF-8 handling and faster compilation
Expand Down
2 changes: 1 addition & 1 deletion r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#' `parse_options`, `convert_options`, or `read_options` arguments, or you can
#' use [CsvTableReader] directly for lower-level access.
#'
#' @param file A character file name, `raw` vector, or an Arrow input stream.
#' @param file A character file name or URI, `raw` vector, or an Arrow input stream.
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' closed when finished; compression will be detected from the file extension
#' and handled automatically. If an input stream is provided, it will be left
Expand Down
15 changes: 3 additions & 12 deletions r/R/dataset-factory.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,8 @@ DatasetFactory$create <- function(x,
stop("'x' must be a string or a list of DatasetFactory", call. = FALSE)
}

if (!inherits(filesystem, "FileSystem")) {
if (grepl("://", x)) {
fs_from_uri <- FileSystem$from_uri(x)
filesystem <- fs_from_uri$fs
x <- fs_from_uri$path
} else {
filesystem <- LocalFileSystem$create()
x <- clean_path_abs(x)
}
}
selector <- FileSelector$create(x, allow_not_found = FALSE, recursive = TRUE)
path_and_fs <- get_path_and_filesystem(x, filesystem)
selector <- FileSelector$create(path_and_fs$path, allow_not_found = FALSE, recursive = TRUE)

if (is.character(format)) {
format <- FileFormat$create(match.arg(format), ...)
Expand All @@ -74,7 +65,7 @@ DatasetFactory$create <- function(x,
partitioning <- DirectoryPartitioningFactory$create(partitioning)
}
}
FileSystemDatasetFactory$create(filesystem, selector, format, partitioning)
FileSystemDatasetFactory$create(path_and_fs$fs, selector, format, partitioning)
}

#' Create a DatasetFactory
Expand Down
13 changes: 2 additions & 11 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,8 @@ Dataset <- R6Class("Dataset", inherit = ArrowObject,
NewScan = function() unique_ptr(ScannerBuilder, dataset___Dataset__NewScan(self)),
ToString = function() self$schema$ToString(),
write = function(path, filesystem = NULL, schema = self$schema, format, partitioning, ...) {
if (!inherits(filesystem, "FileSystem")) {
if (grepl("://", path)) {
fs_from_uri <- FileSystem$from_uri(path)
filesystem <- fs_from_uri$fs
path <- fs_from_uri$path
} else {
filesystem <- LocalFileSystem$create()
path <- clean_path_abs(path)
}
}
dataset___Dataset__Write(self, schema, format, filesystem, path, partitioning)
path_and_fs <- get_path_and_filesystem(path, filesystem)
dataset___Dataset__Write(self, schema, format, path_and_fs$fs, path_and_fs$path, partitioning)
invisible(self)
}
),
Expand Down
6 changes: 3 additions & 3 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#' and the version 2 specification, which is the Apache Arrow IPC file format.
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path or [OutputStream]
#' @param sink A string file path, URI, or [OutputStream]
#' @param version integer Feather file version. Version 2 is the current.
#' Version 1 is the more limited legacy format.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
Expand Down Expand Up @@ -106,7 +106,7 @@ write_feather <- function(x,
assert_is(x, "Table")

if (is.string(sink)) {
sink <- FileOutputStream$create(sink)
sink <- make_output_stream(sink)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -142,7 +142,7 @@ write_feather <- function(x,
#' df <- read_feather(tf, col_select = starts_with("d"))
#' }
read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
if (!inherits(file, "InputStream")) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
on.exit(file$close())
}
Expand Down
22 changes: 21 additions & 1 deletion r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ FileSystem <- R6Class("FileSystem", inherit = ArrowObject,
shared_ptr(InputStream, fs___FileSystem__OpenInputStream(self, clean_path_rel(path)))
},
OpenInputFile = function(path) {
shared_ptr(InputStream, fs___FileSystem__OpenInputFile(self, clean_path_rel(path)))
shared_ptr(RandomAccessFile, fs___FileSystem__OpenInputFile(self, clean_path_rel(path)))
},
OpenOutputStream = function(path) {
shared_ptr(OutputStream, fs___FileSystem__OpenOutputStream(self, clean_path_rel(path)))
Expand All @@ -242,11 +242,31 @@ FileSystem <- R6Class("FileSystem", inherit = ArrowObject,
)
)
FileSystem$from_uri <- function(uri) {
assert_that(is.string(uri))
out <- fs___FileSystemFromUri(uri)
out$fs <- shared_ptr(FileSystem, out$fs)$..dispatch()
out
}

get_path_and_filesystem <- function(x, filesystem = NULL) {
# Wrapper around FileSystem$from_uri that handles local paths
# and an optional explicit filesystem
assert_that(is.string(x))
if (is_url(x)) {
if (!is.null(filesystem)) {
# Stop? Can't have URL (which yields a fs) and another fs
}
FileSystem$from_uri(x)
} else {
list(
fs = filesystem %||% LocalFileSystem$create(),
path = clean_path_abs(x)
)
}
}

is_url <- function(x) grepl("://", x)

#' @usage NULL
#' @format NULL
#' @rdname FileSystem
Expand Down
23 changes: 21 additions & 2 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,25 @@ mmap_open <- function(path, mode = c("read", "write", "readwrite")) {
#' with this compression codec, either a [Codec] or the string name of one.
#' If `NULL` (default) and `file` is a string file name, the function will try
#' to infer compression from the file extension.
#' @param filesystem If not `NULL`, `file` will be opened via the
#' `filesystem$OpenInputFile()` filesystem method, rather than the `io` module's
#' `MemoryMappedFile` or `ReadableFile` constructors.
#' @return An `InputStream` or a subclass of one.
#' @keywords internal
make_readable_file <- function(file, mmap = TRUE, compression = NULL) {
make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem = NULL) {
if (is.string(file)) {
if (is_url(file)) {
fs_and_path <- FileSystem$from_uri(file)
filesystem <- fs_and_path$fs
file <- fs_and_path$path
}
if (is.null(compression)) {
# Infer compression from the file path
compression <- detect_compression(file)
}
if (isTRUE(mmap)) {
if (!is.null(filesystem)) {
file <- filesystem$OpenInputFile(file)
} else if (isTRUE(mmap)) {
file <- mmap_open(file)
} else {
file <- ReadableFile$create(file)
Expand All @@ -247,6 +257,15 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL) {
file
}

make_output_stream <- function(x) {
if (is_url(x)) {
fs_and_path <- FileSystem$from_uri(x)
fs_and_path$fs$OpenOutputStream(fs_and_path$path)
} else {
FileOutputStream$create(x)
}
}

detect_compression <- function(path) {
assert_that(is.string(path))
switch(tools::file_ext(path),
Expand Down
8 changes: 4 additions & 4 deletions r/R/ipc_stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ write_ipc_stream <- function(x, sink, ...) {
x <- Table$create(x)
}
if (is.string(sink)) {
sink <- FileOutputStream$create(sink)
sink <- make_output_stream(sink)
on.exit(sink$close())
}
assert_is(sink, "OutputStream")
Expand Down Expand Up @@ -82,10 +82,10 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' `read_arrow()`, a wrapper around `read_ipc_stream()` and `read_feather()`,
#' is deprecated. You should explicitly choose
#' the function that will read the desired IPC format (stream or file) since
#' a file or `InputStream` may contain either.
#' a file or `InputStream` may contain either.
#'
#' @param file A character file name, `raw` vector, or an Arrow input stream.
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' @param file A character file name or URI, `raw` vector, or an Arrow input stream.
#' If a file name or URI, an Arrow [InputStream] will be opened and
#' closed when finished. If an input stream is provided, it will be left
#' open.
#' @param as_data_frame Should the function return a `data.frame` (default) or
Expand Down
5 changes: 3 additions & 2 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ read_parquet <- function(file,
#' This function enables you to write Parquet files from R.
#'
#' @param x An [arrow::Table][Table], or an object convertible to it.
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string
#' interpreted as a file path or URI
#' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used.
#' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values
#' are coerced to character.
Expand Down Expand Up @@ -129,7 +130,7 @@ write_parquet <- function(x,
}

if (is.string(sink)) {
sink <- FileOutputStream$create(sink)
sink <- make_output_stream(sink)
on.exit(sink$close())
} else if (!inherits(sink, "OutputStream")) {
abort("sink must be a file path or an OutputStream")
Expand Down
6 changes: 5 additions & 1 deletion r/man/make_readable_file.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/read_delim_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/read_json_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion r/man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions r/tests/testthat/test-s3.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

context("S3 integration tests")

run_these <- tryCatch({
if (arrow_with_s3() &&
identical(tolower(Sys.getenv("ARROW_R_DEV")), "true") &&
!identical(Sys.getenv("AWS_ACCESS_KEY_ID"), "") &&
!identical(Sys.getenv("AWS_SECRET_ACCESS_KEY"), "")) {
# See if we have access to the test bucket
bucket <- FileSystem$from_uri("s3://ursa-labs-r-test?region=us-west-2")
bucket$fs$GetFileInfo(bucket$path)
TRUE
} else {
FALSE
}
}, error = function(e) FALSE)

bucket_uri <- function(..., bucket = "s3://ursa-labs-r-test/%s?region=us-west-2") {
segments <- paste(..., sep = "/")
sprintf(bucket, segments)
}

if (run_these) {
now <- as.numeric(Sys.time())
on.exit(bucket$fs$DeleteDir(paste0("ursa-labs-r-test/", now)))

test_that("read/write Feather on S3", {
write_feather(example_data, bucket_uri(now, "test.feather"))
expect_identical(read_feather(bucket_uri(now, "test.feather")), example_data)
})

test_that("read/write Parquet on S3", {
write_parquet(example_data, bucket_uri(now, "test.parquet"))
expect_identical(read_parquet(bucket_uri(now, "test.parquet")), example_data)
})
}
Loading

0 comments on commit b77e8ae

Please sign in to comment.