Skip to content

Commit

Permalink
ARROW-13313: [C++][Compute] Add scalar aggregate node
Browse files Browse the repository at this point in the history
This is a pretty trivial node but it's needed for completeness and will give bindings a pipeline breaker to experiment with until apache#10660 merges

Closes apache#10705 from bkietz/13313-Add-ScalarAggregateNode

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz committed Jul 13, 2021
1 parent cba7f48 commit 7114c4b
Show file tree
Hide file tree
Showing 16 changed files with 514 additions and 45 deletions.
16 changes: 16 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
return ExecBatch(std::move(values), length);
}

Result<std::shared_ptr<RecordBatch>> ExecBatch::ToRecordBatch(
std::shared_ptr<Schema> schema, MemoryPool* pool) const {
ArrayVector columns(schema->num_fields());

for (size_t i = 0; i < columns.size(); ++i) {
const Datum& value = values[i];
if (value.is_array()) {
columns[i] = value.make_array();
continue;
}
ARROW_ASSIGN_OR_RAISE(columns[i], MakeArrayFromScalar(*value.scalar(), length, pool));
}

return RecordBatch::Make(std::move(schema), length, std::move(columns));
}

namespace {

Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ struct ARROW_EXPORT ExecBatch {

static Result<ExecBatch> Make(std::vector<Datum> values);

Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool()) const;

/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;
Expand Down
229 changes: 223 additions & 6 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
#include "arrow/compute/exec/exec_plan.h"

#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include "arrow/array/util.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -33,6 +38,7 @@
namespace arrow {

using internal::checked_cast;
using internal::checked_pointer_cast;

namespace compute {

Expand Down Expand Up @@ -489,15 +495,23 @@ struct ProjectNode : ExecNode {
};

Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label,
std::vector<Expression> exprs) {
std::vector<Expression> exprs,
std::vector<std::string> names) {
FieldVector fields(exprs.size());

if (names.size() == 0) {
names.resize(exprs.size());
for (size_t i = 0; i < exprs.size(); ++i) {
names[i] = exprs[i].ToString();
}
}

int i = 0;
for (auto& expr : exprs) {
if (!expr.IsBound()) {
ARROW_ASSIGN_OR_RAISE(expr, expr.Bind(*input->output_schema()));
}
fields[i] = field(expr.ToString(), expr.type());
fields[i] = field(std::move(names[i]), expr.type());
++i;
}

Expand Down Expand Up @@ -552,15 +566,16 @@ struct SinkNode : ExecNode {
++num_received_;
if (num_received_ == emit_stop_) {
lock.unlock();
producer_.Push(std::move(batch));
Finish();
lock.lock();
return;
}

if (emit_stop_ != -1) {
DCHECK_LE(seq_num, emit_stop_);
}
lock.unlock();

lock.unlock();
producer_.Push(std::move(batch));
}

Expand All @@ -574,8 +589,10 @@ struct SinkNode : ExecNode {
void InputFinished(ExecNode* input, int seq_stop) override {
std::unique_lock<std::mutex> lock(mutex_);
emit_stop_ = seq_stop;
lock.unlock();
Finish();
if (num_received_ == emit_stop_) {
lock.unlock();
Finish();
}
}

private:
Expand All @@ -601,5 +618,205 @@ AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input,
return out;
}

std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
std::shared_ptr<Schema> schema,
std::function<Future<util::optional<ExecBatch>>()> gen, MemoryPool* pool) {
struct Impl : RecordBatchReader {
std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
if (batch) {
ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_, pool_));
} else {
*record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
}
return Status::OK();
}

MemoryPool* pool_;
std::shared_ptr<Schema> schema_;
Iterator<util::optional<ExecBatch>> iterator_;
};

auto out = std::make_shared<Impl>();
out->pool_ = pool;
out->schema_ = std::move(schema);
out->iterator_ = MakeGeneratorIterator(std::move(gen));
return out;
}

struct ScalarAggregateNode : ExecNode {
ScalarAggregateNode(ExecNode* input, std::string label,
std::shared_ptr<Schema> output_schema,
std::vector<const ScalarAggregateKernel*> kernels,
std::vector<std::vector<std::unique_ptr<KernelState>>> states)
: ExecNode(input->plan(), std::move(label), {input}, {"target"},
/*output_schema=*/std::move(output_schema),
/*num_outputs=*/1),
kernels_(std::move(kernels)),
states_(std::move(states)) {}

const char* kind_name() override { return "ScalarAggregateNode"; }

Status DoConsume(const ExecBatch& batch, size_t thread_index) {
for (size_t i = 0; i < kernels_.size(); ++i) {
KernelContext batch_ctx{plan()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());
ExecBatch single_column_batch{{batch.values[i]}, batch.length};
RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
}
return Status::OK();
}

void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
DCHECK_EQ(input, inputs_[0]);

std::unique_lock<std::mutex> lock(mutex_);
auto it =
thread_indices_.emplace(std::this_thread::get_id(), thread_indices_.size()).first;
++num_received_;
auto thread_index = it->second;

lock.unlock();

Status st = DoConsume(std::move(batch), thread_index);
if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
return;
}

lock.lock();
st = MaybeFinish(&lock);
if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
}
}

void ErrorReceived(ExecNode* input, Status error) override {
DCHECK_EQ(input, inputs_[0]);
outputs_[0]->ErrorReceived(this, std::move(error));
}

void InputFinished(ExecNode* input, int seq) override {
DCHECK_EQ(input, inputs_[0]);
std::unique_lock<std::mutex> lock(mutex_);
num_total_ = seq;
Status st = MaybeFinish(&lock);

if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
}
}

Status StartProducing() override {
finished_ = Future<>::Make();
// Scalar aggregates will only output a single batch
outputs_[0]->InputFinished(this, 1);
return Status::OK();
}

void PauseProducing(ExecNode* output) override {}

void ResumeProducing(ExecNode* output) override {}

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
StopProducing();
}

void StopProducing() override {
inputs_[0]->StopProducing(this);
finished_.MarkFinished();
}

Future<> finished() override { return finished_; }

private:
Status MaybeFinish(std::unique_lock<std::mutex>* lock) {
if (num_received_ != num_total_) return Status::OK();

if (finished_.is_finished()) return Status::OK();

ExecBatch batch{{}, 1};
batch.values.resize(kernels_.size());

for (size_t i = 0; i < kernels_.size(); ++i) {
KernelContext ctx{plan()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
}
lock->unlock();

outputs_[0]->InputReceived(this, 0, batch);

finished_.MarkFinished();
return Status::OK();
}

Future<> finished_ = Future<>::MakeFinished();
std::vector<const ScalarAggregateKernel*> kernels_;
std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
std::unordered_map<std::thread::id, size_t> thread_indices_;
std::mutex mutex_;
int num_received_ = 0, num_total_;
};

Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
std::vector<internal::Aggregate> aggregates) {
if (input->output_schema()->num_fields() != static_cast<int>(aggregates.size())) {
return Status::Invalid("Provided ", aggregates.size(),
" aggregates, expected one for each field of ",
input->output_schema()->ToString());
}

auto exec_ctx = input->plan()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
FieldVector fields(kernels.size());

for (size_t i = 0; i < kernels.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto function,
exec_ctx->func_registry()->GetFunction(aggregates[i].function));

if (function->kind() != Function::SCALAR_AGGREGATE) {
return Status::Invalid("Provided non ScalarAggregateFunction ",
aggregates[i].function);
}

auto in_type = ValueDescr::Array(input->output_schema()->fields()[i]->type());

ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact({in_type}));
kernels[i] = static_cast<const ScalarAggregateKernel*>(kernel);

if (aggregates[i].options == nullptr) {
aggregates[i].options = function->default_options();
}

KernelContext kernel_ctx{exec_ctx};
states[i].resize(exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1);
RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx,
KernelInitArgs{kernels[i],
{
in_type,
},
aggregates[i].options},
&states[i]));

// pick one to resolve the kernel signature
kernel_ctx.SetState(states[i][0].get());
ARROW_ASSIGN_OR_RAISE(
auto descr, kernels[i]->signature->out_type().Resolve(&kernel_ctx, {in_type}));

fields[i] = field(aggregates[i].function, std::move(descr.type));
}

return input->plan()->EmplaceNode<ScalarAggregateNode>(
input, std::move(label), schema(std::move(fields)), std::move(kernels),
std::move(states));
}

} // namespace compute
} // namespace arrow
20 changes: 17 additions & 3 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <vector>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -243,12 +244,19 @@ ExecNode* MakeSourceNode(ExecPlan* plan, std::string label,

/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
///
/// Emitted batches will not be ordered; instead they will be tagged with the `seq` at
/// which they were received.
/// Emitted batches will not be ordered.
ARROW_EXPORT
std::function<Future<util::optional<ExecBatch>>()> MakeSinkNode(ExecNode* input,
std::string label);

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
ARROW_EXPORT
std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
std::shared_ptr<Schema>, std::function<Future<util::optional<ExecBatch>>()>,
MemoryPool*);

/// \brief Make a node which excludes some rows from batches passed through it
///
/// The filter Expression will be evaluated against each batch which is pushed to
Expand All @@ -265,9 +273,15 @@ Result<ExecNode*> MakeFilterNode(ExecNode* input, std::string label, Expression
/// this node to produce a corresponding output column.
///
/// If exprs are not already bound, they will be bound against the input's schema.
/// If names are not provided, the string representations of exprs will be used.
ARROW_EXPORT
Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label,
std::vector<Expression> exprs);
std::vector<Expression> exprs,
std::vector<std::string> names = {});

ARROW_EXPORT
Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
std::vector<internal::Aggregate> aggregates);

} // namespace compute
} // namespace arrow
Loading

0 comments on commit 7114c4b

Please sign in to comment.