Skip to content

Commit

Permalink
ARROW-9235: [R] Support for connection class when reading and writi…
Browse files Browse the repository at this point in the history
…ng files

This is a PR to support arbitrary R "connection" objects as Input and Output streams. In particular, this adds support for sockets (ARROW-4512), URLs, and some other IO operations that are implemented as R connections (e.g., in the [archive](https://github.com/r-lib/archive#archive) package). The gist of it is that you should be able to do this:

``` r
# remotes::install_github("paleolimbot/arrow/r@r-connections")
library(arrow, warn.conflicts = FALSE)

addr <- "https://github.com/apache/arrow/raw/master/r/inst/v0.7.1.parquet"

stream <- arrow:::make_readable_file(addr)
rawToChar(as.raw(stream$Read(4)))
#> [1] "PAR1"
stream$close()

stream <- arrow:::make_readable_file(url(addr, open = "rb"))
rawToChar(as.raw(stream$Read(4)))
#> [1] "PAR1"
stream$close()
```

There are two serious issues that prevent this PR from being useful yet. First, it uses functions that R considers "non-API" functions from the C API.

    > checking compiled code ... NOTE
      File ‘arrow/libs/arrow.so’:
        Found non-API calls to R: ‘R_GetConnection’, ‘R_ReadConnection’,
          ‘R_WriteConnection’

      Compiled code should not call non-API entry points in R.

We can get around this by calling back into R (in the same way this PR implements `Tell()` and `Close()`). We could also go all out and implement the other half (exposing `InputStream`/`OutputStream`s as R connections) and ask for an exemption (at least one R package, curl, does this). The archive package seems to expose connections without a NOTE on the CRAN check page, so perhaps there is also a workaround.

Second, we get a crash when passing the input stream to most functions. I think this is because the `Read()` method is getting called from another thread but it also could be an error in my implementation. If the issue is threading, we would have to arrange a way to queue jobs for the R main thread (e.g., how the [later](https://github.com/r-lib/later#background-tasks) package does it) and a way to ping it occasionally to fetch the results. This is complicated but might be useful for other reasons (supporting evaluation of R functions in more places). It also might be more work than it's worth.

``` r
# remotes::install_github("paleolimbot/arrow/r@r-connections")
library(arrow, warn.conflicts = FALSE)

addr <- "https://github.com/apache/arrow/raw/master/r/inst/v0.7.1.parquet"
read_parquet(addr)
```

```
*** caught segfault ***
address 0x28, cause 'invalid permissions'

Traceback:
 1: parquet___arrow___FileReader__OpenFile(file, props)
```

Closes apache#12323 from paleolimbot/r-connections

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
  • Loading branch information
2 people authored and nealrichardson committed Apr 22, 2022
1 parent c16bbe1 commit 6cf344b
Show file tree
Hide file tree
Showing 15 changed files with 574 additions and 45 deletions.
20 changes: 16 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ FeatherReader <- R6Class("FeatherReader",
inherit = ArrowObject,
public = list(
Read = function(columns) {
ipc___feather___Reader__Read(self, columns)
ipc___feather___Reader__Read(self, columns, on_old_windows())
},
print = function(...) {
cat("FeatherReader:\n")
Expand All @@ -215,5 +215,5 @@ names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file) {
assert_is(file, "RandomAccessFile")
ipc___feather___Reader__Open(file)
ipc___feather___Reader__Open(file, on_old_windows())
}
43 changes: 37 additions & 6 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ BufferReader$create <- function(x) {
io___BufferReader__initialize(x)
}


#' Create a new read/write memory mapped file of a given size
#'
#' @param path file path
Expand Down Expand Up @@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
}
if (is.string(file)) {
if (is_url(file)) {
fs_and_path <- FileSystem$from_uri(file)
filesystem <- fs_and_path$fs
file <- fs_and_path$path
file <- tryCatch({
fs_and_path <- FileSystem$from_uri(file)
filesystem <- fs_and_path$fs
fs_and_path$path
}, error = function(e) {
MakeRConnectionInputStream(url(file, open = "rb"))
})
}

if (is.null(compression)) {
# Infer compression from the file path
compression <- detect_compression(file)
}

if (!is.null(filesystem)) {
file <- filesystem$OpenInputFile(file)
} else if (isTRUE(mmap)) {
} else if (is.string(file) && isTRUE(mmap)) {
file <- mmap_open(file)
} else {
} else if (is.string(file)) {
file <- ReadableFile$create(file)
}

if (!identical(compression, "uncompressed")) {
file <- CompressedInputStream$create(file, compression)
}
} else if (inherits(file, c("raw", "Buffer"))) {
file <- BufferReader$create(file)
} else if (inherits(file, "connection")) {
if (!isOpen(file)) {
open(file, "rb")
}

# Try to create a RandomAccessFile first because some readers need this
# (e.g., feather, parquet) but fall back on an InputStream for the readers
# that don't (e.g., IPC, CSV)
file <- tryCatch(
MakeRConnectionRandomAccessFile(file),
error = function(e) MakeRConnectionInputStream(file)
)
}
assert_is(file, "InputStream")
file
}

make_output_stream <- function(x, filesystem = NULL) {
if (inherits(x, "connection")) {
if (!isOpen(x)) {
open(x, "wb")
}

return(MakeRConnectionOutputStream(x))
}

if (inherits(x, "SubTreeFileSystem")) {
filesystem <- x$base_fs
x <- x$base_path
Expand All @@ -287,7 +315,10 @@ make_output_stream <- function(x, filesystem = NULL) {
}

detect_compression <- function(path) {
assert_that(is.string(path))
if (!is.string(path)) {
return("uncompressed")
}

switch(tools::file_ext(path),
bz2 = "bz2",
gz = "gzip",
Expand Down
3 changes: 2 additions & 1 deletion r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read_parquet <- function(file,
as_data_frame = TRUE,
props = ParquetArrowReaderProperties$create(),
...) {
if (is.string(file)) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
on.exit(file$close())
}
Expand Down Expand Up @@ -541,6 +541,7 @@ ParquetFileReader$create <- function(file,
...) {
file <- make_readable_file(file, mmap)
assert_is(props, "ParquetArrowReaderProperties")
assert_is(file, "RandomAccessFile")

parquet___arrow___FileReader__OpenFile(file, props)
}
Expand Down
47 changes: 38 additions & 9 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions r/src/csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#if defined(ARROW_R_WITH_ARROW)

#include "./safe-call-into-r.h"

#include <arrow/csv/reader.h>
#include <arrow/csv/writer.h>
#include <arrow/memory_pool.h>
Expand Down Expand Up @@ -162,7 +164,16 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
// [[arrow::export]]
std::shared_ptr<arrow::Table> csv___TableReader__Read(
const std::shared_ptr<arrow::csv::TableReader>& table_reader) {
#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(table_reader->Read());
#else
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
return DeferNotOk(
io_context.executor()->Submit([&]() { return table_reader->Read(); }));
});
return ValueOrStop(result);
#endif
}

// [[arrow::export]]
Expand Down
74 changes: 54 additions & 20 deletions r/src/feather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "./arrow_types.h"

#if defined(ARROW_R_WITH_ARROW)

#include "./safe-call-into-r.h"

#include <arrow/ipc/feather.h>
#include <arrow/type.h>

Expand Down Expand Up @@ -48,34 +51,65 @@ int ipc___feather___Reader__version(

// [[arrow::export]]
std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
std::shared_ptr<arrow::Table> table;

switch (TYPEOF(columns)) {
case STRSXP: {
R_xlen_t n = XLENGTH(columns);
std::vector<std::string> names(n);
for (R_xlen_t i = 0; i < n; i++) {
names[i] = CHAR(STRING_ELT(columns, i));
}
StopIfNotOk(reader->Read(names, &table));
break;
const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns,
bool on_old_windows) {
bool use_names = columns != R_NilValue;
std::vector<std::string> names;
if (use_names) {
cpp11::strings columns_chr(columns);
names.reserve(columns_chr.size());
for (const auto& name : columns_chr) {
names.push_back(name);
}
case NILSXP:
StopIfNotOk(reader->Read(&table));
break;
default:
cpp11::stop("incompatible column specification");
break;
}

return table;
auto read_table = [&]() {
std::shared_ptr<arrow::Table> table;
arrow::Status read_result;
if (use_names) {
read_result = reader->Read(names, &table);
} else {
read_result = reader->Read(&table);
}

if (read_result.ok()) {
return arrow::Result<std::shared_ptr<arrow::Table>>(table);
} else {
return arrow::Result<std::shared_ptr<arrow::Table>>(read_result);
}
};

#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(read_table());
#else
if (!on_old_windows) {
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>(
[&]() { return DeferNotOk(io_context.executor()->Submit(read_table)); });
return ValueOrStop(result);
} else {
return ValueOrStop(read_table());
}
#endif
}

// [[arrow::export]]
std::shared_ptr<arrow::ipc::feather::Reader> ipc___feather___Reader__Open(
const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
const std::shared_ptr<arrow::io::RandomAccessFile>& stream, bool on_old_windows) {
#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
#else
if (!on_old_windows) {
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::ipc::feather::Reader>>([&]() {
return DeferNotOk(io_context.executor()->Submit(
[&]() { return arrow::ipc::feather::Reader::Open(stream); }));
});
return ValueOrStop(result);
} else {
return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
}
#endif
}

// [[arrow::export]]
Expand Down
Loading

0 comments on commit 6cf344b

Please sign in to comment.