Skip to content

Commit

Permalink
ARROW-11591: [C++][Compute] Grouped aggregation
Browse files Browse the repository at this point in the history
This patch adds basic building blocks for grouped aggregation:

- `Grouper` for producing integer arrays encoding group id from batches of keys
- `HashAggregateKernel` for consuming batches of arguments and group ids, updating internal sums/counts/...

For testing purposes, a one-shot grouped aggregation function is provided:
```c++
std::shared_ptr<arrow::Array> needs_sum = ...;
std::shared_ptr<arrow::Array> needs_min_max = ...;
std::shared_ptr<arrow::Array> key_0 = ...;
std::shared_ptr<arrow::Array> key_1 = ...;

ARROW_ASSIGN_OR_RAISE(arrow::Datum out,
  arrow::compute::internal::GroupBy({
    needs_sum,
    needs_min_max,
  }, {
    key_0,
    key_1,
  }, {
    {"sum", nullptr},  // first argument will be summed
    {"min_max", &min_max_options},  // second argument's extrema will be found
}));

// Unpack struct array result (a four-field array)
auto out_array = out.array_as<StructArray>();
std::shared_ptr<arrow::Array> sums = out_array->field(0);
std::shared_ptr<arrow::Array> mins_and_maxes = out_array->field(1);
std::shared_ptr<arrow::Array> group_key_0 = out_array->field(2);
std::shared_ptr<arrow::Array> group_key_1 = out_array->field(3);
```

Closes apache#9621 from bkietz/groupby1

Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: michalursa <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz and michalursa committed Mar 23, 2021
1 parent 7d233cb commit e2440a3
Show file tree
Hide file tree
Showing 40 changed files with 2,608 additions and 544 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ if(ARROW_COMPUTE)
compute/kernels/aggregate_tdigest.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/codegen_internal.cc
compute/kernels/hash_aggregate.cc
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/array/array_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ class BaseBinaryArray : public FlatArray {
}
}

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
// For subclasses
BaseBinaryArray() : raw_value_offsets_(NULLPTR), raw_data_(NULLPTR) {}
BaseBinaryArray() = default;

// Protected method for constructors
void SetData(const std::shared_ptr<ArrayData>& data) {
Expand All @@ -132,8 +132,8 @@ class BaseBinaryArray : public FlatArray {
raw_data_ = data->GetValuesSafe<uint8_t>(2, /*offset=*/0);
}

const offset_type* raw_value_offsets_;
const uint8_t* raw_data_;
const offset_type* raw_value_offsets_ = NULLPTR;
const uint8_t* raw_data_ = NULLPTR;
};

/// Concrete Array class for variable-size binary data
Expand Down Expand Up @@ -231,9 +231,9 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {

const uint8_t* raw_values() const { return raw_values_ + data_->offset * byte_width_; }

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
void SetData(const std::shared_ptr<ArrayData>& data) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/array/array_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class NumericArray : public PrimitiveArray {
// For API compatibility with BinaryArray etc.
value_type GetView(int64_t i) const { return Value(i); }

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
using PrimitiveArray::PrimitiveArray;
Expand Down Expand Up @@ -99,9 +99,9 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
/// values. Result is not cached.
int64_t true_count() const;

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
using PrimitiveArray::PrimitiveArray;
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/buffer_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class ARROW_EXPORT BufferBuilder {
return Status::OK();
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() {
buffer_ = NULLPTR;
capacity_ = size_ = 0;
Expand Down Expand Up @@ -202,6 +208,11 @@ class TypedBufferBuilder<
MemoryPool* pool = default_memory_pool())
: bytes_builder_(std::move(buffer), pool) {}

explicit TypedBufferBuilder(BufferBuilder builder)
: bytes_builder_(std::move(builder)) {}

BufferBuilder* bytes_builder() { return &bytes_builder_; }

Status Append(T value) {
return bytes_builder_.Append(reinterpret_cast<uint8_t*>(&value), sizeof(T));
}
Expand Down Expand Up @@ -256,6 +267,12 @@ class TypedBufferBuilder<
return bytes_builder_.Finish(out, shrink_to_fit);
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() { bytes_builder_.Reset(); }

int64_t length() const { return bytes_builder_.length() / sizeof(T); }
Expand All @@ -274,6 +291,11 @@ class TypedBufferBuilder<bool> {
explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool())
: bytes_builder_(pool) {}

explicit TypedBufferBuilder(BufferBuilder builder)
: bytes_builder_(std::move(builder)) {}

BufferBuilder* bytes_builder() { return &bytes_builder_; }

Status Append(bool value) {
ARROW_RETURN_NOT_OK(Reserve(1));
UnsafeAppend(value);
Expand Down Expand Up @@ -371,6 +393,12 @@ class TypedBufferBuilder<bool> {
return bytes_builder_.Finish(out, shrink_to_fit);
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() {
bytes_builder_.Reset();
bit_length_ = false_count_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compare.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class EqualOptions {
return res;
}

static EqualOptions Defaults() { return EqualOptions(); }
static EqualOptions Defaults() { return {}; }

protected:
double atol_ = kDefaultAbsoluteTolerance;
Expand Down
97 changes: 97 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,102 @@ Result<Datum> TDigest(const Datum& value,
const TDigestOptions& options = TDigestOptions::Defaults(),
ExecContext* ctx = NULLPTR);

namespace internal {

/// Internal use only: streaming group identifier.
/// Consumes batches of keys and yields batches of the group ids.
class ARROW_EXPORT Grouper {
public:
virtual ~Grouper() = default;

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
ExecContext* ctx = default_exec_context());

/// Consume a batch of keys, producing the corresponding group ids as an integer array.
/// Currently only uint32 indices will be produced, eventually the bit width will only
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
virtual uint32_t num_groups() const = 0;

/// \brief Assemble lists of indices of identical elements.
///
/// \param[in] ids An unsigned, all-valid integral array which will be
/// used as grouping criteria.
/// \param[in] num_groups An upper bound for the elements of ids
/// \return A num_groups-long ListArray where the slot at i contains a
/// list of indices where i appears in ids.
///
/// MakeGroupings([
/// 2,
/// 2,
/// 5,
/// 5,
/// 2,
/// 3
/// ], 8) == [
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> MakeGroupings(
const UInt32Array& ids, uint32_t num_groups,
ExecContext* ctx = default_exec_context());

/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
/// the provided groupings.
///
/// For example,
/// ApplyGroupings([
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ], [2, 2, 5, 5, 2, 3]) == [
/// [],
/// [],
/// [2, 2, 2],
/// [3],
/// [],
/// [5, 5],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> ApplyGroupings(
const ListArray& groupings, const Array& array,
ExecContext* ctx = default_exec_context());
};

/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
std::string function;

/// options for the aggregation function
const FunctionOptions* options;
};

/// Internal use only: helper function for testing HashAggregateKernels.
/// This will be replaced by streaming execution operators.
ARROW_EXPORT
Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates,
ExecContext* ctx = default_exec_context());

} // namespace internal
} // namespace compute
} // namespace arrow
41 changes: 41 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/compute/registry.h"
#include "arrow/compute/util_internal.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
Expand All @@ -57,6 +58,44 @@ using internal::CpuInfo;

namespace compute {

ExecContext* default_exec_context() {
static ExecContext default_ctx;
return &default_ctx;
}

ExecBatch::ExecBatch(const RecordBatch& batch)
: values(batch.num_columns()), length(batch.num_rows()) {
auto columns = batch.column_data();
std::move(columns.begin(), columns.end(), values.begin());
}

Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
if (values.empty()) {
return Status::Invalid("Cannot infer ExecBatch length without at least one value");
}

int64_t length = -1;
for (const auto& value : values) {
if (value.is_scalar()) {
if (length == -1) {
length = 1;
}
continue;
}

if (length == -1) {
length = value.length();
continue;
}

if (length != value.length()) {
return Status::Invalid(
"Arrays used to construct an ExecBatch must have equal length");
}
}

return ExecBatch(std::move(values), length);
}
namespace {

Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
Expand Down Expand Up @@ -838,6 +877,7 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {

private:
Status Consume(const ExecBatch& batch) {
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
auto batch_state = kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_});
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);

Expand All @@ -855,6 +895,7 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {

kernel_->merge(kernel_ctx_, std::move(*batch_state), state());
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);

return Status::OK();
}

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class ARROW_EXPORT ExecContext {
bool use_threads_ = true;
};

ARROW_EXPORT ExecContext* default_exec_context();

// TODO: Consider standardizing on uint16 selection vectors and only use them
// when we can ensure that each value is 64K length or smaller

Expand Down Expand Up @@ -164,11 +166,15 @@ class ARROW_EXPORT SelectionVector {
/// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight
/// than is desirable for this class. Microbenchmarks would help determine for
/// sure. See ARROW-8928.
struct ExecBatch {
struct ARROW_EXPORT ExecBatch {
ExecBatch() = default;
ExecBatch(std::vector<Datum> values, int64_t length)
: values(std::move(values)), length(length) {}

explicit ExecBatch(const RecordBatch& batch);

static Result<ExecBatch> Make(std::vector<Datum> values);

/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class ARROW_EXPORT KernelExecutor {
public:
virtual ~KernelExecutor() = default;

/// The Kernel's `init` method must be called and any KernelState set in the
/// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
/// the case where init may be expensive and does not need to be called again for
/// each execution of the kernel, for example the same lookup table can be re-used
/// for all scanned batches in a dataset filter.
virtual Status Init(KernelContext*, KernelInitArgs) = 0;

/// XXX: Better configurability for listener
Expand Down
18 changes: 17 additions & 1 deletion cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ const Kernel* DispatchExactImpl(const Function* func,
checked_cast<const ScalarAggregateFunction*>(func)->kernels(), values);
}

if (func->kind() == Function::HASH_AGGREGATE) {
return DispatchExactImpl(checked_cast<const HashAggregateFunction*>(func)->kernels(),
values);
}

return nullptr;
}

Expand Down Expand Up @@ -184,8 +189,10 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
executor = detail::KernelExecutor::MakeScalar();
} else if (kind() == Function::VECTOR) {
executor = detail::KernelExecutor::MakeVector();
} else {
} else if (kind() == Function::SCALAR_AGGREGATE) {
executor = detail::KernelExecutor::MakeScalarAggregate();
} else {
return Status::NotImplemented("Direct execution of HASH_AGGREGATE functions");
}
RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options}));

Expand Down Expand Up @@ -263,6 +270,15 @@ Status ScalarAggregateFunction::AddKernel(ScalarAggregateKernel kernel) {
return Status::OK();
}

Status HashAggregateFunction::AddKernel(HashAggregateKernel kernel) {
RETURN_NOT_OK(CheckArity(kernel.signature->in_types()));
if (arity_.is_varargs && !kernel.signature->is_varargs()) {
return Status::Invalid("Function accepts varargs but kernel signature does not");
}
kernels_.emplace_back(std::move(kernel));
return Status::OK();
}

Result<Datum> MetaFunction::Execute(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx) const {
Expand Down
Loading

0 comments on commit e2440a3

Please sign in to comment.