Skip to content

Commit

Permalink
ARROW-14008: [R][Compute] Running an ExecPlan should yield Reader ins…
Browse files Browse the repository at this point in the history
…tead of Table

This only modifies the binding (c++) so that returning a RecordBatchReader is possible; the R API continues to collect the stream of batches into a Table.

Closes apache#11213 from bkietz/14008-ExecPlan-run-should-retur

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz committed Sep 23, 2021
1 parent dd6c325 commit eec7b55
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
do_exec_plan <- function(.data) {
plan <- ExecPlan$create()
final_node <- plan$Build(.data)
tab <- plan$Run(final_node)
tab <- plan$Run(final_node)$read_table()

# If arrange() created $temp_columns, make sure to omit them from the result
# We can't currently handle this in the ExecPlan itself because sorting
Expand Down
2 changes: 1 addition & 1 deletion r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
}

// [[arrow::export]]
std::shared_ptr<arrow::Table> ExecPlan_run(
std::shared_ptr<arrow::RecordBatchReader> ExecPlan_run(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<compute::ExecNode>& final_node, cpp11::list sort_options) {
// For now, don't require R to construct SinkNodes.
Expand All @@ -77,11 +77,21 @@ std::shared_ptr<arrow::Table> ExecPlan_run(
StopIfNotOk(plan->Validate());
StopIfNotOk(plan->StartProducing());

std::shared_ptr<arrow::RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
final_node->output_schema(), std::move(sink_gen), gc_memory_pool());

plan->finished().Wait();
return ValueOrStop(arrow::Table::FromRecordBatchReader(sink_reader.get()));
// If the generator is destroyed before being completely drained, inform plan
std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
bool not_finished_yet =
plan->finished().TryAddCallback([&plan] {
return [plan](const arrow::Status&) {};
});

if (not_finished_yet) {
plan->StopProducing();
}
}};

return compute::MakeGeneratorReader(
final_node->output_schema(),
[stop_producing, plan, sink_gen] { return sink_gen(); }, gc_memory_pool());
}

#if defined(ARROW_R_WITH_DATASET)
Expand Down

0 comments on commit eec7b55

Please sign in to comment.