diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 04756aaf8e97b..df72dcc5b6b9d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -373,6 +373,7 @@ if(ARROW_COMPUTE) compute/kernels/aggregate_tdigest.cc compute/kernels/aggregate_var_std.cc compute/kernels/codegen_internal.cc + compute/kernels/hash_aggregate.cc compute/kernels/scalar_arithmetic.cc compute/kernels/scalar_boolean.cc compute/kernels/scalar_cast_boolean.cc diff --git a/cpp/src/arrow/array/array_binary.h b/cpp/src/arrow/array/array_binary.h index d3ae93318ba63..db3c640b9a4e1 100644 --- a/cpp/src/arrow/array/array_binary.h +++ b/cpp/src/arrow/array/array_binary.h @@ -117,13 +117,13 @@ class BaseBinaryArray : public FlatArray { } } - IteratorType begin() { return IteratorType(*this); } + IteratorType begin() const { return IteratorType(*this); } - IteratorType end() { return IteratorType(*this, length()); } + IteratorType end() const { return IteratorType(*this, length()); } protected: // For subclasses - BaseBinaryArray() : raw_value_offsets_(NULLPTR), raw_data_(NULLPTR) {} + BaseBinaryArray() = default; // Protected method for constructors void SetData(const std::shared_ptr& data) { @@ -132,8 +132,8 @@ class BaseBinaryArray : public FlatArray { raw_data_ = data->GetValuesSafe(2, /*offset=*/0); } - const offset_type* raw_value_offsets_; - const uint8_t* raw_data_; + const offset_type* raw_value_offsets_ = NULLPTR; + const uint8_t* raw_data_ = NULLPTR; }; /// Concrete Array class for variable-size binary data @@ -231,9 +231,9 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray { const uint8_t* raw_values() const { return raw_values_ + data_->offset * byte_width_; } - IteratorType begin() { return IteratorType(*this); } + IteratorType begin() const { return IteratorType(*this); } - IteratorType end() { return IteratorType(*this, length()); } + IteratorType end() const { return IteratorType(*this, length()); } protected: void SetData(const std::shared_ptr& data) { diff --git a/cpp/src/arrow/array/array_primitive.h b/cpp/src/arrow/array/array_primitive.h index f9ac60f6cb98a..b601eb770c32f 100644 --- a/cpp/src/arrow/array/array_primitive.h +++ b/cpp/src/arrow/array/array_primitive.h @@ -64,9 +64,9 @@ class NumericArray : public PrimitiveArray { // For API compatibility with BinaryArray etc. value_type GetView(int64_t i) const { return Value(i); } - IteratorType begin() { return IteratorType(*this); } + IteratorType begin() const { return IteratorType(*this); } - IteratorType end() { return IteratorType(*this, length()); } + IteratorType end() const { return IteratorType(*this, length()); } protected: using PrimitiveArray::PrimitiveArray; @@ -99,9 +99,9 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray { /// values. Result is not cached. int64_t true_count() const; - IteratorType begin() { return IteratorType(*this); } + IteratorType begin() const { return IteratorType(*this); } - IteratorType end() { return IteratorType(*this, length()); } + IteratorType end() const { return IteratorType(*this, length()); } protected: using PrimitiveArray::PrimitiveArray; diff --git a/cpp/src/arrow/buffer_builder.h b/cpp/src/arrow/buffer_builder.h index 41a47c9172940..f525ec23c58af 100644 --- a/cpp/src/arrow/buffer_builder.h +++ b/cpp/src/arrow/buffer_builder.h @@ -162,6 +162,12 @@ class ARROW_EXPORT BufferBuilder { return Status::OK(); } + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + void Reset() { buffer_ = NULLPTR; capacity_ = size_ = 0; @@ -202,6 +208,11 @@ class TypedBufferBuilder< MemoryPool* pool = default_memory_pool()) : bytes_builder_(std::move(buffer), pool) {} + explicit TypedBufferBuilder(BufferBuilder builder) + : bytes_builder_(std::move(builder)) {} + + BufferBuilder* bytes_builder() { return &bytes_builder_; } + Status Append(T value) { return bytes_builder_.Append(reinterpret_cast(&value), sizeof(T)); } @@ -256,6 +267,12 @@ class TypedBufferBuilder< return bytes_builder_.Finish(out, shrink_to_fit); } + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + void Reset() { bytes_builder_.Reset(); } int64_t length() const { return bytes_builder_.length() / sizeof(T); } @@ -274,6 +291,11 @@ class TypedBufferBuilder { explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool()) : bytes_builder_(pool) {} + explicit TypedBufferBuilder(BufferBuilder builder) + : bytes_builder_(std::move(builder)) {} + + BufferBuilder* bytes_builder() { return &bytes_builder_; } + Status Append(bool value) { ARROW_RETURN_NOT_OK(Reserve(1)); UnsafeAppend(value); @@ -371,6 +393,12 @@ class TypedBufferBuilder { return bytes_builder_.Finish(out, shrink_to_fit); } + Result> Finish(bool shrink_to_fit = true) { + std::shared_ptr out; + ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit)); + return out; + } + void Reset() { bytes_builder_.Reset(); bit_length_ = false_count_ = 0; diff --git a/cpp/src/arrow/compare.h b/cpp/src/arrow/compare.h index 387105de9e706..6769b23867b65 100644 --- a/cpp/src/arrow/compare.h +++ b/cpp/src/arrow/compare.h @@ -71,7 +71,7 @@ class EqualOptions { return res; } - static EqualOptions Defaults() { return EqualOptions(); } + static EqualOptions Defaults() { return {}; } protected: double atol_ = kDefaultAbsoluteTolerance; diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index eef1587bb732b..ca118ec56780a 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -306,5 +306,102 @@ Result TDigest(const Datum& value, const TDigestOptions& options = TDigestOptions::Defaults(), ExecContext* ctx = NULLPTR); +namespace internal { + +/// Internal use only: streaming group identifier. +/// Consumes batches of keys and yields batches of the group ids. +class ARROW_EXPORT Grouper { + public: + virtual ~Grouper() = default; + + /// Construct a Grouper which receives the specified key types + static Result> Make(const std::vector& descrs, + ExecContext* ctx = default_exec_context()); + + /// Consume a batch of keys, producing the corresponding group ids as an integer array. + /// Currently only uint32 indices will be produced, eventually the bit width will only + /// be as wide as necessary. + virtual Result Consume(const ExecBatch& batch) = 0; + + /// Get current unique keys. May be called multiple times. + virtual Result GetUniques() = 0; + + /// Get the current number of groups. + virtual uint32_t num_groups() const = 0; + + /// \brief Assemble lists of indices of identical elements. + /// + /// \param[in] ids An unsigned, all-valid integral array which will be + /// used as grouping criteria. + /// \param[in] num_groups An upper bound for the elements of ids + /// \return A num_groups-long ListArray where the slot at i contains a + /// list of indices where i appears in ids. + /// + /// MakeGroupings([ + /// 2, + /// 2, + /// 5, + /// 5, + /// 2, + /// 3 + /// ], 8) == [ + /// [], + /// [], + /// [0, 1, 4], + /// [5], + /// [], + /// [2, 3], + /// [], + /// [] + /// ] + static Result> MakeGroupings( + const UInt32Array& ids, uint32_t num_groups, + ExecContext* ctx = default_exec_context()); + + /// \brief Produce a ListArray whose slots are selections of `array` which correspond to + /// the provided groupings. + /// + /// For example, + /// ApplyGroupings([ + /// [], + /// [], + /// [0, 1, 4], + /// [5], + /// [], + /// [2, 3], + /// [], + /// [] + /// ], [2, 2, 5, 5, 2, 3]) == [ + /// [], + /// [], + /// [2, 2, 2], + /// [3], + /// [], + /// [5, 5], + /// [], + /// [] + /// ] + static Result> ApplyGroupings( + const ListArray& groupings, const Array& array, + ExecContext* ctx = default_exec_context()); +}; + +/// \brief Configure a grouped aggregation +struct ARROW_EXPORT Aggregate { + /// the name of the aggregation function + std::string function; + + /// options for the aggregation function + const FunctionOptions* options; +}; + +/// Internal use only: helper function for testing HashAggregateKernels. +/// This will be replaced by streaming execution operators. +ARROW_EXPORT +Result GroupBy(const std::vector& arguments, const std::vector& keys, + const std::vector& aggregates, + ExecContext* ctx = default_exec_context()); + +} // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 6443c96e91868..c3187a3995a3c 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -36,6 +36,7 @@ #include "arrow/compute/registry.h" #include "arrow/compute/util_internal.h" #include "arrow/datum.h" +#include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/type.h" @@ -57,6 +58,44 @@ using internal::CpuInfo; namespace compute { +ExecContext* default_exec_context() { + static ExecContext default_ctx; + return &default_ctx; +} + +ExecBatch::ExecBatch(const RecordBatch& batch) + : values(batch.num_columns()), length(batch.num_rows()) { + auto columns = batch.column_data(); + std::move(columns.begin(), columns.end(), values.begin()); +} + +Result ExecBatch::Make(std::vector values) { + if (values.empty()) { + return Status::Invalid("Cannot infer ExecBatch length without at least one value"); + } + + int64_t length = -1; + for (const auto& value : values) { + if (value.is_scalar()) { + if (length == -1) { + length = 1; + } + continue; + } + + if (length == -1) { + length = value.length(); + continue; + } + + if (length != value.length()) { + return Status::Invalid( + "Arrays used to construct an ExecBatch must have equal length"); + } + } + + return ExecBatch(std::move(values), length); +} namespace { Result> AllocateDataBuffer(KernelContext* ctx, int64_t length, @@ -838,6 +877,7 @@ class ScalarAggExecutor : public KernelExecutorImpl { private: Status Consume(const ExecBatch& batch) { + // FIXME(ARROW-11840) don't merge *any* aggegates for every batch auto batch_state = kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}); ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_); @@ -855,6 +895,7 @@ class ScalarAggExecutor : public KernelExecutorImpl { kernel_->merge(kernel_ctx_, std::move(*batch_state), state()); ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_); + return Status::OK(); } diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index f491489ed8a27..7659442d8bf45 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -119,6 +119,8 @@ class ARROW_EXPORT ExecContext { bool use_threads_ = true; }; +ARROW_EXPORT ExecContext* default_exec_context(); + // TODO: Consider standardizing on uint16 selection vectors and only use them // when we can ensure that each value is 64K length or smaller @@ -164,11 +166,15 @@ class ARROW_EXPORT SelectionVector { /// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight /// than is desirable for this class. Microbenchmarks would help determine for /// sure. See ARROW-8928. -struct ExecBatch { +struct ARROW_EXPORT ExecBatch { ExecBatch() = default; ExecBatch(std::vector values, int64_t length) : values(std::move(values)), length(length) {} + explicit ExecBatch(const RecordBatch& batch); + + static Result Make(std::vector values); + /// The values representing positional arguments to be passed to a kernel's /// exec function for processing. std::vector values; diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index a74e5c8d8fa6c..55daa243cd351 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -106,6 +106,11 @@ class ARROW_EXPORT KernelExecutor { public: virtual ~KernelExecutor() = default; + /// The Kernel's `init` method must be called and any KernelState set in the + /// KernelContext *before* KernelExecutor::Init is called. This is to facilitate + /// the case where init may be expensive and does not need to be called again for + /// each execution of the kernel, for example the same lookup table can be re-used + /// for all scanned batches in a dataset filter. virtual Status Init(KernelContext*, KernelInitArgs) = 0; /// XXX: Better configurability for listener diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 70d7d998e9cc5..c8fc8b8dec07f 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -126,6 +126,11 @@ const Kernel* DispatchExactImpl(const Function* func, checked_cast(func)->kernels(), values); } + if (func->kind() == Function::HASH_AGGREGATE) { + return DispatchExactImpl(checked_cast(func)->kernels(), + values); + } + return nullptr; } @@ -184,8 +189,10 @@ Result Function::Execute(const std::vector& args, executor = detail::KernelExecutor::MakeScalar(); } else if (kind() == Function::VECTOR) { executor = detail::KernelExecutor::MakeVector(); - } else { + } else if (kind() == Function::SCALAR_AGGREGATE) { executor = detail::KernelExecutor::MakeScalarAggregate(); + } else { + return Status::NotImplemented("Direct execution of HASH_AGGREGATE functions"); } RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options})); @@ -263,6 +270,15 @@ Status ScalarAggregateFunction::AddKernel(ScalarAggregateKernel kernel) { return Status::OK(); } +Status HashAggregateFunction::AddKernel(HashAggregateKernel kernel) { + RETURN_NOT_OK(CheckArity(kernel.signature->in_types())); + if (arity_.is_varargs && !kernel.signature->is_varargs()) { + return Status::Invalid("Function accepts varargs but kernel signature does not"); + } + kernels_.emplace_back(std::move(kernel)); + return Status::OK(); +} + Result MetaFunction::Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const { diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index af5d81a30ecf0..9a3e1c1852f46 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -133,6 +133,10 @@ class ARROW_EXPORT Function { /// A function that computes scalar summary statistics from array input. SCALAR_AGGREGATE, + /// A function that computes grouped summary statistics from array input + /// and an array of group identifiers. + HASH_AGGREGATE, + /// A function that dispatches to other functions and does not contain its /// own kernels. META @@ -307,6 +311,21 @@ class ARROW_EXPORT ScalarAggregateFunction Status AddKernel(ScalarAggregateKernel kernel); }; +class ARROW_EXPORT HashAggregateFunction + : public detail::FunctionImpl { + public: + using KernelType = HashAggregateKernel; + + HashAggregateFunction(std::string name, const Arity& arity, const FunctionDoc* doc, + const FunctionOptions* default_options = NULLPTR) + : detail::FunctionImpl( + std::move(name), Function::HASH_AGGREGATE, arity, doc, default_options) {} + + /// \brief Add a kernel (function implementation). Returns error if the + /// kernel's signature does not match the function's arity. + Status AddKernel(HashAggregateKernel kernel); +}; + /// \brief A function that dispatches to other functions. Must implement /// MetaFunction::ExecuteImpl. /// diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index c8f9cacfb3412..b99b41170d2c5 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -537,7 +537,8 @@ struct Kernel { : signature(std::move(sig)), init(std::move(init)) {} Kernel(std::vector in_types, OutputType out_type, KernelInit init) - : Kernel(KernelSignature::Make(std::move(in_types), out_type), std::move(init)) {} + : Kernel(KernelSignature::Make(std::move(in_types), std::move(out_type)), + std::move(init)) {} /// \brief The "signature" of the kernel containing the InputType input /// argument validators and OutputType output type and shape resolver. @@ -574,7 +575,8 @@ struct ArrayKernel : public Kernel { ArrayKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR) - : Kernel(std::move(in_types), std::move(out_type), init), exec(std::move(exec)) {} + : Kernel(std::move(in_types), std::move(out_type), std::move(init)), + exec(std::move(exec)) {} /// \brief Perform a single invocation of this kernel. Depending on the /// implementation, it may only write into preallocated memory, while in some @@ -617,7 +619,7 @@ struct VectorKernel : public ArrayKernel { VectorKernel() = default; VectorKernel(std::shared_ptr sig, ArrayKernelExec exec) - : ArrayKernel(std::move(sig), exec) {} + : ArrayKernel(std::move(sig), std::move(exec)) {} VectorKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR, VectorFinalize finalize = NULLPTR) @@ -680,12 +682,12 @@ using ScalarAggregateFinalize = std::function; /// * finalize: produces the end result of the aggregation using the /// KernelState in the KernelContext. struct ScalarAggregateKernel : public Kernel { - ScalarAggregateKernel() {} + ScalarAggregateKernel() = default; ScalarAggregateKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateConsume consume, ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) - : Kernel(std::move(sig), init), + : Kernel(std::move(sig), std::move(init)), consume(std::move(consume)), merge(std::move(merge)), finalize(std::move(finalize)) {} @@ -693,13 +695,59 @@ struct ScalarAggregateKernel : public Kernel { ScalarAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, ScalarAggregateConsume consume, ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) - : ScalarAggregateKernel(KernelSignature::Make(std::move(in_types), out_type), init, - consume, merge, finalize) {} + : ScalarAggregateKernel( + KernelSignature::Make(std::move(in_types), std::move(out_type)), + std::move(init), std::move(consume), std::move(merge), std::move(finalize)) {} ScalarAggregateConsume consume; ScalarAggregateMerge merge; ScalarAggregateFinalize finalize; }; +// ---------------------------------------------------------------------- +// HashAggregateKernel (for HashAggregateFunction) + +using HashAggregateConsume = std::function; + +using HashAggregateMerge = + std::function; + +// Finalize returns Datum to permit multiple return values +using HashAggregateFinalize = std::function; + +/// \brief Kernel data structure for implementations of +/// HashAggregateFunction. The four necessary components of an aggregation +/// kernel are the init, consume, merge, and finalize functions. +/// +/// * init: creates a new KernelState for a kernel. +/// * consume: processes an ExecBatch (which includes the argument as well +/// as an array of group identifiers) and updates the KernelState found in the +/// KernelContext. +/// * merge: combines one KernelState with another. +/// * finalize: produces the end result of the aggregation using the +/// KernelState in the KernelContext. +struct HashAggregateKernel : public Kernel { + HashAggregateKernel() = default; + + HashAggregateKernel(std::shared_ptr sig, KernelInit init, + HashAggregateConsume consume, HashAggregateMerge merge, + HashAggregateFinalize finalize) + : Kernel(std::move(sig), std::move(init)), + consume(std::move(consume)), + merge(std::move(merge)), + finalize(std::move(finalize)) {} + + HashAggregateKernel(std::vector in_types, OutputType out_type, + KernelInit init, HashAggregateMerge merge, + HashAggregateConsume consume, HashAggregateFinalize finalize) + : HashAggregateKernel( + KernelSignature::Make(std::move(in_types), std::move(out_type)), + std::move(init), std::move(consume), std::move(merge), std::move(finalize)) {} + + HashAggregateConsume consume; + HashAggregateMerge merge; + HashAggregateFinalize finalize; +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 577b250da87ed..5e223a1f906f0 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -59,5 +59,9 @@ add_arrow_benchmark(vector_selection_benchmark PREFIX "arrow-compute") # Aggregates -add_arrow_compute_test(aggregate_test SOURCES aggregate_test.cc test_util.cc) +add_arrow_compute_test(aggregate_test + SOURCES + aggregate_test.cc + hash_aggregate_test.cc + test_util.cc) add_arrow_benchmark(aggregate_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 5cdd3bd1dd1b5..61dc8cb403c4b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -250,15 +250,13 @@ const FunctionDoc min_max_doc{"Compute the minimum and maximum values of a numer {"array"}, "MinMaxOptions"}; -const FunctionDoc any_doc{ - "Test whether any element in a boolean array evaluates to true.", - ("Null values are ignored."), - {"array"}}; - -const FunctionDoc all_doc{ - "Test whether all elements in a boolean array evaluate to true.", - ("Null values are ignored."), - {"array"}}; +const FunctionDoc any_doc{"Test whether any element in a boolean array evaluates to true", + ("Null values are ignored."), + {"array"}}; + +const FunctionDoc all_doc{"Test whether all elements in a boolean array evaluate to true", + ("Null values are ignored."), + {"array"}}; } // namespace diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc index c90dd03c06ed7..42be0c36544c9 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc @@ -300,6 +300,169 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll) ->Apply(BenchmarkSetArgs); #endif // ARROW_WITH_BENCHMARKS_REFERENCE +// +// GroupBy +// + +static void BenchmarkGroupBy(benchmark::State& state, + std::vector aggregates, + std::vector arguments, std::vector keys) { + for (auto _ : state) { + ABORT_NOT_OK(GroupBy(arguments, keys, aggregates).status()); + } +} + +#define GROUP_BY_BENCHMARK(Name, Impl) \ + static void Name(benchmark::State& state) { \ + RegressionArgs args(state, false); \ + auto rng = random::RandomArrayGenerator(1923); \ + (Impl)(); \ + } \ + BENCHMARK(Name)->Apply([](benchmark::internal::Benchmark* bench) { \ + BenchmarkSetArgsWithSizes(bench, {1 * 1024 * 1024}); \ + }) + +GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyStringSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.StringWithRepeats(args.size, + /*unique=*/16, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallStringSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.StringWithRepeats(args.size, + /*unique=*/256, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumStringSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.StringWithRepeats(args.size, + /*unique=*/4096, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntegerSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.Int64(args.size, + /*min=*/0, + /*max=*/15); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntegerSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.Int64(args.size, + /*min=*/0, + /*max=*/255); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntegerSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto key = rng.Int64(args.size, + /*min=*/0, + /*max=*/4095); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedByTinyIntStringPairSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto int_key = rng.Int64(args.size, + /*min=*/0, + /*max=*/4); + auto str_key = rng.StringWithRepeats(args.size, + /*unique=*/4, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {int_key, str_key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedBySmallIntStringPairSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto int_key = rng.Int64(args.size, + /*min=*/0, + /*max=*/15); + auto str_key = rng.StringWithRepeats(args.size, + /*unique=*/16, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {int_key, str_key}); +}); + +GROUP_BY_BENCHMARK(SumDoublesGroupedByMediumIntStringPairSet, [&] { + auto summand = rng.Float64(args.size, + /*min=*/0.0, + /*max=*/1.0e14, + /*null_probability=*/args.null_proportion, + /*nan_probability=*/args.null_proportion / 10); + + auto int_key = rng.Int64(args.size, + /*min=*/0, + /*max=*/63); + auto str_key = rng.StringWithRepeats(args.size, + /*unique=*/64, + /*min_length=*/3, + /*max_length=*/32); + + BenchmarkGroupBy(state, {{"hash_sum", NULLPTR}}, {summand}, {int_key, str_key}); +}); + // // Sum // diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 569886a13519f..22e7f512e97f7 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -27,19 +27,26 @@ #include "arrow/array.h" #include "arrow/chunked_array.h" #include "arrow/compute/api_aggregate.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/registry.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bitmap_reader.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/int_util_internal.h" #include "arrow/testing/gtest_common.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" +#include "arrow/util/logging.h" namespace arrow { +using internal::BitmapReader; using internal::checked_cast; using internal::checked_pointer_cast; @@ -65,8 +72,7 @@ static SumResult NaiveSumPartial(const Array& array) { const auto values = array_numeric.raw_values(); if (array.null_count() != 0) { - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), - array.length()); + BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); for (int64_t i = 0; i < array.length(); i++) { if (reader.IsSet()) { result.first += values[i]; @@ -488,9 +494,7 @@ class TestPrimitiveMinMaxKernel : public ::testing::Test { void AssertMinMaxIsNull(const Datum& array, const MinMaxOptions& options) { ASSERT_OK_AND_ASSIGN(Datum out, MinMax(array, options)); - - const StructScalar& value = out.scalar_as(); - for (const auto& val : value.value) { + for (const auto& val : out.scalar_as().value) { ASSERT_FALSE(val->is_valid); } } @@ -646,8 +650,7 @@ static enable_if_integer> NaiveMinMax( T min = std::numeric_limits::max(); T max = std::numeric_limits::min(); if (array.null_count() != 0) { // Some values are null - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), - array.length()); + BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); for (int64_t i = 0; i < array.length(); i++) { if (reader.IsSet()) { min = std::min(min, values[i]); @@ -686,8 +689,7 @@ static enable_if_floating_point> NaiveMinMax( T min = std::numeric_limits::infinity(); T max = -std::numeric_limits::infinity(); if (array.null_count() != 0) { // Some values are null - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), - array.length()); + BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); for (int64_t i = 0; i < array.length(); i++) { if (reader.IsSet()) { min = std::fmin(min, values[i]); @@ -1030,7 +1032,7 @@ ModeResult NaiveMode(const Array& array) { const auto& array_numeric = reinterpret_cast(array); const auto values = array_numeric.raw_values(); - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); + BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); for (int64_t i = 0; i < array.length(); ++i) { if (reader.IsSet()) { ++value_counts[values[i]]; @@ -1281,7 +1283,7 @@ void KahanSum(double& sum, double& adjust, double addend) { template std::pair WelfordVar(const ArrayType& array) { const auto values = array.raw_values(); - internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); + BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length()); double count = 0, mean = 0, m2 = 0; double mean_adjust = 0, m2_adjust = 0; for (int64_t i = 0; i < array.length(); ++i) { diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc new file mode 100644 index 0000000000000..d9750cb4760d8 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -0,0 +1,1057 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/api_aggregate.h" + +#include +#include +#include +#include +#include + +#include "arrow/buffer_builder.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/exec_internal.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace internal { +namespace { + +struct KeyEncoder { + // the first byte of an encoded key is used to indicate nullity + static constexpr bool kExtraByteForNull = true; + + static constexpr uint8_t kNullByte = 1; + static constexpr uint8_t kValidByte = 0; + + virtual ~KeyEncoder() = default; + + virtual void AddLength(const ArrayData&, int32_t* lengths) = 0; + + virtual Status Encode(const ArrayData&, uint8_t** encoded_bytes) = 0; + + virtual Result> Decode(uint8_t** encoded_bytes, + int32_t length, MemoryPool*) = 0; + + // extract the null bitmap from the leading nullity bytes of encoded keys + static Status DecodeNulls(MemoryPool* pool, int32_t length, uint8_t** encoded_bytes, + std::shared_ptr* null_bitmap, int32_t* null_count) { + // first count nulls to determine if a null bitmap is necessary + *null_count = 0; + for (int32_t i = 0; i < length; ++i) { + *null_count += (encoded_bytes[i][0] == kNullByte); + } + + if (*null_count > 0) { + ARROW_ASSIGN_OR_RAISE(*null_bitmap, AllocateBitmap(length, pool)); + + uint8_t* validity = (*null_bitmap)->mutable_data(); + for (int32_t i = 0; i < length; ++i) { + BitUtil::SetBitTo(validity, i, encoded_bytes[i][0] == kValidByte); + encoded_bytes[i] += 1; + } + } else { + for (int32_t i = 0; i < length; ++i) { + encoded_bytes[i] += 1; + } + } + return Status ::OK(); + } +}; + +struct BooleanKeyEncoder : KeyEncoder { + static constexpr int kByteWidth = 1; + + void AddLength(const ArrayData& data, int32_t* lengths) override { + for (int64_t i = 0; i < data.length; ++i) { + lengths[i] += kByteWidth + kExtraByteForNull; + } + } + + Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override { + VisitArrayDataInline( + data, + [&](bool value) { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kValidByte; + *encoded_ptr++ = value; + }, + [&] { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kNullByte; + *encoded_ptr++ = 0; + }); + return Status::OK(); + } + + Result> Decode(uint8_t** encoded_bytes, int32_t length, + MemoryPool* pool) override { + std::shared_ptr null_buf; + int32_t null_count; + RETURN_NOT_OK(DecodeNulls(pool, length, encoded_bytes, &null_buf, &null_count)); + + ARROW_ASSIGN_OR_RAISE(auto key_buf, AllocateBitmap(length, pool)); + + uint8_t* raw_output = key_buf->mutable_data(); + for (int32_t i = 0; i < length; ++i) { + auto& encoded_ptr = encoded_bytes[i]; + BitUtil::SetBitTo(raw_output, i, encoded_ptr[0] != 0); + encoded_ptr += 1; + } + + return ArrayData::Make(boolean(), length, {std::move(null_buf), std::move(key_buf)}, + null_count); + } +}; + +struct FixedWidthKeyEncoder : KeyEncoder { + explicit FixedWidthKeyEncoder(std::shared_ptr type) + : type_(std::move(type)), + byte_width_(checked_cast(*type_).bit_width() / 8) {} + + void AddLength(const ArrayData& data, int32_t* lengths) override { + for (int64_t i = 0; i < data.length; ++i) { + lengths[i] += byte_width_ + kExtraByteForNull; + } + } + + Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override { + ArrayData viewed(fixed_size_binary(byte_width_), data.length, data.buffers, + data.null_count, data.offset); + + VisitArrayDataInline( + viewed, + [&](util::string_view bytes) { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kValidByte; + memcpy(encoded_ptr, bytes.data(), byte_width_); + encoded_ptr += byte_width_; + }, + [&] { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kNullByte; + memset(encoded_ptr, 0, byte_width_); + encoded_ptr += byte_width_; + }); + return Status::OK(); + } + + Result> Decode(uint8_t** encoded_bytes, int32_t length, + MemoryPool* pool) override { + std::shared_ptr null_buf; + int32_t null_count; + RETURN_NOT_OK(DecodeNulls(pool, length, encoded_bytes, &null_buf, &null_count)); + + ARROW_ASSIGN_OR_RAISE(auto key_buf, AllocateBuffer(length * byte_width_, pool)); + + uint8_t* raw_output = key_buf->mutable_data(); + for (int32_t i = 0; i < length; ++i) { + auto& encoded_ptr = encoded_bytes[i]; + std::memcpy(raw_output, encoded_ptr, byte_width_); + encoded_ptr += byte_width_; + raw_output += byte_width_; + } + + return ArrayData::Make(type_, length, {std::move(null_buf), std::move(key_buf)}, + null_count); + } + + std::shared_ptr type_; + int byte_width_; +}; + +struct DictionaryKeyEncoder : FixedWidthKeyEncoder { + DictionaryKeyEncoder(std::shared_ptr type, MemoryPool* pool) + : FixedWidthKeyEncoder(std::move(type)), pool_(pool) {} + + Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override { + auto dict = MakeArray(data.dictionary); + if (dictionary_) { + if (!dictionary_->Equals(dict)) { + // TODO(bkietz) unify if necessary. For now, just error if any batch's dictionary + // differs from the first we saw for this key + return Status::NotImplemented("Unifying differing dictionaries"); + } + } else { + dictionary_ = std::move(dict); + } + return FixedWidthKeyEncoder::Encode(data, encoded_bytes); + } + + Result> Decode(uint8_t** encoded_bytes, int32_t length, + MemoryPool* pool) override { + ARROW_ASSIGN_OR_RAISE(auto data, + FixedWidthKeyEncoder::Decode(encoded_bytes, length, pool)); + + if (dictionary_) { + data->dictionary = dictionary_->data(); + } else { + ARROW_ASSIGN_OR_RAISE(auto dict, MakeArrayOfNull(type_, 0)); + data->dictionary = dict->data(); + } + + data->type = type_; + return data; + } + + MemoryPool* pool_; + std::shared_ptr dictionary_; +}; + +template +struct VarLengthKeyEncoder : KeyEncoder { + using Offset = typename T::offset_type; + + void AddLength(const ArrayData& data, int32_t* lengths) override { + int64_t i = 0; + VisitArrayDataInline( + data, + [&](util::string_view bytes) { + lengths[i++] += + kExtraByteForNull + sizeof(Offset) + static_cast(bytes.size()); + }, + [&] { lengths[i++] += kExtraByteForNull + sizeof(Offset); }); + } + + Status Encode(const ArrayData& data, uint8_t** encoded_bytes) override { + VisitArrayDataInline( + data, + [&](util::string_view bytes) { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kValidByte; + util::SafeStore(encoded_ptr, static_cast(bytes.size())); + encoded_ptr += sizeof(Offset); + memcpy(encoded_ptr, bytes.data(), bytes.size()); + encoded_ptr += bytes.size(); + }, + [&] { + auto& encoded_ptr = *encoded_bytes++; + *encoded_ptr++ = kNullByte; + util::SafeStore(encoded_ptr, static_cast(0)); + encoded_ptr += sizeof(Offset); + }); + return Status::OK(); + } + + Result> Decode(uint8_t** encoded_bytes, int32_t length, + MemoryPool* pool) override { + std::shared_ptr null_buf; + int32_t null_count; + RETURN_NOT_OK(DecodeNulls(pool, length, encoded_bytes, &null_buf, &null_count)); + + Offset length_sum = 0; + for (int32_t i = 0; i < length; ++i) { + length_sum += util::SafeLoadAs(encoded_bytes[i]); + } + + ARROW_ASSIGN_OR_RAISE(auto offset_buf, + AllocateBuffer(sizeof(Offset) * (1 + length), pool)); + ARROW_ASSIGN_OR_RAISE(auto key_buf, AllocateBuffer(length_sum)); + + auto raw_offsets = reinterpret_cast(offset_buf->mutable_data()); + auto raw_keys = key_buf->mutable_data(); + + Offset current_offset = 0; + for (int32_t i = 0; i < length; ++i) { + raw_offsets[i] = current_offset; + + auto key_length = util::SafeLoadAs(encoded_bytes[i]); + encoded_bytes[i] += sizeof(Offset); + + memcpy(raw_keys + current_offset, encoded_bytes[i], key_length); + encoded_bytes[i] += key_length; + + current_offset += key_length; + } + raw_offsets[length] = current_offset; + + return ArrayData::Make( + type_, length, {std::move(null_buf), std::move(offset_buf), std::move(key_buf)}, + null_count); + } + + explicit VarLengthKeyEncoder(std::shared_ptr type) : type_(std::move(type)) {} + + std::shared_ptr type_; +}; + +struct GrouperImpl : Grouper { + static Result> Make(const std::vector& keys, + ExecContext* ctx) { + auto impl = ::arrow::internal::make_unique(); + + impl->encoders_.resize(keys.size()); + impl->ctx_ = ctx; + + for (size_t i = 0; i < keys.size(); ++i) { + const auto& key = keys[i].type; + + if (key->id() == Type::BOOL) { + impl->encoders_[i] = ::arrow::internal::make_unique(); + continue; + } + + if (key->id() == Type::DICTIONARY) { + impl->encoders_[i] = + ::arrow::internal::make_unique(key, ctx->memory_pool()); + continue; + } + + if (is_fixed_width(key->id())) { + impl->encoders_[i] = ::arrow::internal::make_unique(key); + continue; + } + + if (is_binary_like(key->id())) { + impl->encoders_[i] = + ::arrow::internal::make_unique>(key); + continue; + } + + if (is_large_binary_like(key->id())) { + impl->encoders_[i] = + ::arrow::internal::make_unique>(key); + continue; + } + + return Status::NotImplemented("Keys of type ", *key); + } + + return std::move(impl); + } + + Result Consume(const ExecBatch& batch) override { + std::vector offsets_batch(batch.length + 1); + for (int i = 0; i < batch.num_values(); ++i) { + encoders_[i]->AddLength(*batch[i].array(), offsets_batch.data()); + } + + int32_t total_length = 0; + for (int64_t i = 0; i < batch.length; ++i) { + auto total_length_before = total_length; + total_length += offsets_batch[i]; + offsets_batch[i] = total_length_before; + } + offsets_batch[batch.length] = total_length; + + std::vector key_bytes_batch(total_length); + std::vector key_buf_ptrs(batch.length); + for (int64_t i = 0; i < batch.length; ++i) { + key_buf_ptrs[i] = key_bytes_batch.data() + offsets_batch[i]; + } + + for (int i = 0; i < batch.num_values(); ++i) { + RETURN_NOT_OK(encoders_[i]->Encode(*batch[i].array(), key_buf_ptrs.data())); + } + + TypedBufferBuilder group_ids_batch(ctx_->memory_pool()); + RETURN_NOT_OK(group_ids_batch.Resize(batch.length)); + + for (int64_t i = 0; i < batch.length; ++i) { + int32_t key_length = offsets_batch[i + 1] - offsets_batch[i]; + std::string key( + reinterpret_cast(key_bytes_batch.data() + offsets_batch[i]), + key_length); + + auto it_success = map_.emplace(key, num_groups_); + auto group_id = it_success.first->second; + + if (it_success.second) { + // new key; update offsets and key_bytes + ++num_groups_; + auto next_key_offset = static_cast(key_bytes_.size()); + key_bytes_.resize(next_key_offset + key_length); + offsets_.push_back(next_key_offset + key_length); + memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length); + } + + group_ids_batch.UnsafeAppend(group_id); + } + + ARROW_ASSIGN_OR_RAISE(auto group_ids, group_ids_batch.Finish()); + return Datum(UInt32Array(batch.length, std::move(group_ids))); + } + + uint32_t num_groups() const override { return num_groups_; } + + Result GetUniques() override { + ExecBatch out({}, num_groups_); + + std::vector key_buf_ptrs(num_groups_); + for (int64_t i = 0; i < num_groups_; ++i) { + key_buf_ptrs[i] = key_bytes_.data() + offsets_[i]; + } + + out.values.resize(encoders_.size()); + for (size_t i = 0; i < encoders_.size(); ++i) { + ARROW_ASSIGN_OR_RAISE( + out.values[i], + encoders_[i]->Decode(key_buf_ptrs.data(), static_cast(num_groups_), + ctx_->memory_pool())); + } + + return out; + } + + ExecContext* ctx_; + std::unordered_map map_; + std::vector offsets_ = {0}; + std::vector key_bytes_; + uint32_t num_groups_ = 0; + std::vector> encoders_; +}; + +/// C++ abstract base class for the HashAggregateKernel interface. +/// Implementations should be default constructible and perform initialization in +/// Init(). +struct GroupedAggregator : KernelState { + virtual Status Init(ExecContext*, const FunctionOptions*, + const std::shared_ptr&) = 0; + + virtual Status Consume(const ExecBatch& batch) = 0; + + virtual Result Finalize() = 0; + + template + Status MaybeReserve(int64_t old_num_groups, const ExecBatch& batch, + const Reserve& reserve) { + int64_t new_num_groups = batch[2].scalar_as().value; + if (new_num_groups <= old_num_groups) { + return Status::OK(); + } + return reserve(new_num_groups - old_num_groups); + } + + virtual std::shared_ptr out_type() const = 0; +}; + +// ---------------------------------------------------------------------- +// Count implementation + +struct GroupedCountImpl : public GroupedAggregator { + Status Init(ExecContext* ctx, const FunctionOptions* options, + const std::shared_ptr&) override { + options_ = checked_cast(*options); + counts_ = BufferBuilder(ctx->memory_pool()); + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + RETURN_NOT_OK(MaybeReserve(counts_.length(), batch, [&](int64_t added_groups) { + num_groups_ += added_groups; + return counts_.Append(added_groups * sizeof(int64_t), 0); + })); + + auto group_ids = batch[1].array()->GetValues(1); + auto raw_counts = reinterpret_cast(counts_.mutable_data()); + + const auto& input = batch[0].array(); + + if (options_.count_mode == CountOptions::COUNT_NULL) { + for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, ++input_i) { + auto g = group_ids[i]; + raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i); + } + return Status::OK(); + } + + arrow::internal::VisitSetBitRunsVoid( + input->buffers[0], input->offset, input->length, + [&](int64_t begin, int64_t length) { + for (int64_t input_i = begin, i = begin - input->offset; + input_i < begin + length; ++input_i, ++i) { + auto g = group_ids[i]; + raw_counts[g] += 1; + } + }); + return Status::OK(); + } + + Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto counts, counts_.Finish()); + return std::make_shared(num_groups_, std::move(counts)); + } + + std::shared_ptr out_type() const override { return int64(); } + + int64_t num_groups_ = 0; + CountOptions options_; + BufferBuilder counts_; +}; + +// ---------------------------------------------------------------------- +// Sum implementation + +struct GroupedSumImpl : public GroupedAggregator { + // NB: whether we are accumulating into double, int64_t, or uint64_t + // we always have 64 bits per group in the sums buffer. + static constexpr size_t kSumSize = sizeof(int64_t); + + using ConsumeImpl = std::function&, + const uint32_t*, void*, int64_t*)>; + + struct GetConsumeImpl { + template ::Type> + Status Visit(const T&) { + consume_impl = [](const std::shared_ptr& input, const uint32_t* group, + void* boxed_sums, int64_t* counts) { + auto sums = reinterpret_cast::CType*>(boxed_sums); + + VisitArrayDataInline( + *input, + [&](typename TypeTraits::CType value) { + sums[*group] += value; + counts[*group] += 1; + ++group; + }, + [&] { ++group; }); + }; + out_type = TypeTraits::type_singleton(); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Summing data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Summing data of type ", type); + } + + ConsumeImpl consume_impl; + std::shared_ptr out_type; + }; + + Status Init(ExecContext* ctx, const FunctionOptions*, + const std::shared_ptr& input_type) override { + pool_ = ctx->memory_pool(); + sums_ = BufferBuilder(pool_); + counts_ = BufferBuilder(pool_); + + GetConsumeImpl get_consume_impl; + RETURN_NOT_OK(VisitTypeInline(*input_type, &get_consume_impl)); + + consume_impl_ = std::move(get_consume_impl.consume_impl); + out_type_ = std::move(get_consume_impl.out_type); + + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + RETURN_NOT_OK(MaybeReserve(num_groups_, batch, [&](int64_t added_groups) { + num_groups_ += added_groups; + RETURN_NOT_OK(sums_.Append(added_groups * kSumSize, 0)); + RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0)); + return Status::OK(); + })); + + auto group_ids = batch[1].array()->GetValues(1); + consume_impl_(batch[0].array(), group_ids, sums_.mutable_data(), + reinterpret_cast(counts_.mutable_data())); + return Status::OK(); + } + + Result Finalize() override { + std::shared_ptr null_bitmap; + int64_t null_count = 0; + + for (int64_t i = 0; i < num_groups_; ++i) { + if (reinterpret_cast(counts_.data())[i] > 0) continue; + + if (null_bitmap == nullptr) { + ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_)); + BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true); + } + + null_count += 1; + BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false); + } + + ARROW_ASSIGN_OR_RAISE(auto sums, sums_.Finish()); + + return ArrayData::Make(std::move(out_type_), num_groups_, + {std::move(null_bitmap), std::move(sums)}, null_count); + } + + std::shared_ptr out_type() const override { return out_type_; } + + // NB: counts are used here instead of a simple "has_values_" bitmap since + // we expect to reuse this kernel to handle Mean + int64_t num_groups_ = 0; + BufferBuilder sums_, counts_; + std::shared_ptr out_type_; + ConsumeImpl consume_impl_; + MemoryPool* pool_; +}; + +// ---------------------------------------------------------------------- +// MinMax implementation + +template +struct Extrema : std::numeric_limits {}; + +template <> +struct Extrema { + static constexpr float min() { return -std::numeric_limits::infinity(); } + static constexpr float max() { return std::numeric_limits::infinity(); } +}; + +template <> +struct Extrema { + static constexpr double min() { return -std::numeric_limits::infinity(); } + static constexpr double max() { return std::numeric_limits::infinity(); } +}; + +struct GroupedMinMaxImpl : public GroupedAggregator { + using ConsumeImpl = + std::function&, const uint32_t*, void*, void*, + uint8_t*, uint8_t*)>; + + using ResizeImpl = std::function; + + template + static ResizeImpl MakeResizeImpl(CType anti_extreme) { + // resize a min or max buffer, storing the correct anti extreme + return [anti_extreme](BufferBuilder* builder, int64_t added_groups) { + TypedBufferBuilder typed_builder(std::move(*builder)); + RETURN_NOT_OK(typed_builder.Append(added_groups, anti_extreme)); + *builder = std::move(*typed_builder.bytes_builder()); + return Status::OK(); + }; + } + + struct GetImpl { + template ::CType> + enable_if_number Visit(const T&) { + consume_impl = [](const std::shared_ptr& input, const uint32_t* group, + void* mins, void* maxes, uint8_t* has_values, + uint8_t* has_nulls) { + auto raw_mins = reinterpret_cast(mins); + auto raw_maxes = reinterpret_cast(maxes); + + VisitArrayDataInline( + *input, + [&](CType val) { + raw_maxes[*group] = std::max(raw_maxes[*group], val); + raw_mins[*group] = std::min(raw_mins[*group], val); + BitUtil::SetBit(has_values, *group++); + }, + [&] { BitUtil::SetBit(has_nulls, *group++); }); + }; + + resize_min_impl = MakeResizeImpl(Extrema::max()); + resize_max_impl = MakeResizeImpl(Extrema::min()); + return Status::OK(); + } + + Status Visit(const BooleanType& type) { + return Status::NotImplemented("Grouped MinMax data of type ", type); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Grouped MinMax data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Grouped MinMax data of type ", type); + } + + ConsumeImpl consume_impl; + ResizeImpl resize_min_impl, resize_max_impl; + }; + + Status Init(ExecContext* ctx, const FunctionOptions* options, + const std::shared_ptr& input_type) override { + options_ = *checked_cast(options); + type_ = input_type; + + mins_ = BufferBuilder(ctx->memory_pool()); + maxes_ = BufferBuilder(ctx->memory_pool()); + has_values_ = BufferBuilder(ctx->memory_pool()); + has_nulls_ = BufferBuilder(ctx->memory_pool()); + + GetImpl get_impl; + RETURN_NOT_OK(VisitTypeInline(*input_type, &get_impl)); + + consume_impl_ = std::move(get_impl.consume_impl); + resize_min_impl_ = std::move(get_impl.resize_min_impl); + resize_max_impl_ = std::move(get_impl.resize_max_impl); + resize_bitmap_impl_ = MakeResizeImpl(false); + + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + RETURN_NOT_OK(MaybeReserve(num_groups_, batch, [&](int64_t added_groups) { + num_groups_ += added_groups; + RETURN_NOT_OK(resize_min_impl_(&mins_, added_groups)); + RETURN_NOT_OK(resize_max_impl_(&maxes_, added_groups)); + RETURN_NOT_OK(resize_bitmap_impl_(&has_values_, added_groups)); + RETURN_NOT_OK(resize_bitmap_impl_(&has_nulls_, added_groups)); + return Status::OK(); + })); + + auto group_ids = batch[1].array()->GetValues(1); + consume_impl_(batch[0].array(), group_ids, mins_.mutable_data(), + maxes_.mutable_data(), has_values_.mutable_data(), + has_nulls_.mutable_data()); + return Status::OK(); + } + + Result Finalize() override { + // aggregation for group is valid if there was at least one value in that group + ARROW_ASSIGN_OR_RAISE(auto null_bitmap, has_values_.Finish()); + + if (options_.null_handling == MinMaxOptions::EMIT_NULL) { + // ... and there were no nulls in that group + ARROW_ASSIGN_OR_RAISE(auto has_nulls, has_nulls_.Finish()); + arrow::internal::BitmapAndNot(null_bitmap->data(), 0, has_nulls->data(), 0, + num_groups_, 0, null_bitmap->mutable_data()); + } + + auto mins = ArrayData::Make(type_, num_groups_, {null_bitmap, nullptr}); + auto maxes = ArrayData::Make(type_, num_groups_, {std::move(null_bitmap), nullptr}); + ARROW_ASSIGN_OR_RAISE(mins->buffers[1], mins_.Finish()); + ARROW_ASSIGN_OR_RAISE(maxes->buffers[1], maxes_.Finish()); + + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(mins), std::move(maxes)}); + } + + std::shared_ptr out_type() const override { + return struct_({field("min", type_), field("max", type_)}); + } + + int64_t num_groups_; + BufferBuilder mins_, maxes_, has_values_, has_nulls_; + std::shared_ptr type_; + ConsumeImpl consume_impl_; + ResizeImpl resize_min_impl_, resize_max_impl_, resize_bitmap_impl_; + MinMaxOptions options_; +}; + +template +HashAggregateKernel MakeKernel(InputType argument_type) { + HashAggregateKernel kernel; + + kernel.init = [](KernelContext* ctx, + const KernelInitArgs& args) -> std::unique_ptr { + auto impl = ::arrow::internal::make_unique(); + // FIXME(bkietz) Init should not take a type. That should be an unboxed template arg + // for the Impl. Otherwise we're not exposing dispatch as well as we should. + ctx->SetStatus(impl->Init(ctx->exec_context(), args.options, args.inputs[0].type)); + if (ctx->HasError()) return nullptr; + return std::move(impl); + }; + + kernel.signature = KernelSignature::Make( + {std::move(argument_type), InputType::Array(Type::UINT32), + InputType::Scalar(Type::UINT32)}, + OutputType( + [](KernelContext* ctx, const std::vector&) -> Result { + return checked_cast(ctx->state())->out_type(); + })); + + kernel.consume = [](KernelContext* ctx, const ExecBatch& batch) { + ctx->SetStatus(checked_cast(ctx->state())->Consume(batch)); + }; + + kernel.merge = [](KernelContext* ctx, KernelState&&, KernelState*) { + // TODO(ARROW-11840) merge two hash tables + ctx->SetStatus(Status::NotImplemented("Merge hashed aggregations")); + }; + + kernel.finalize = [](KernelContext* ctx, Datum* out) { + KERNEL_ASSIGN_OR_RAISE(*out, ctx, + checked_cast(ctx->state())->Finalize()); + }; + + return kernel; +} + +Result> GetKernels( + ExecContext* ctx, const std::vector& aggregates, + const std::vector& in_descrs) { + if (aggregates.size() != in_descrs.size()) { + return Status::Invalid(aggregates.size(), " aggregate functions were specified but ", + in_descrs.size(), " arguments were provided."); + } + + std::vector kernels(in_descrs.size()); + + for (size_t i = 0; i < aggregates.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto function, + ctx->func_registry()->GetFunction(aggregates[i].function)); + ARROW_ASSIGN_OR_RAISE( + const Kernel* kernel, + function->DispatchExact( + {in_descrs[i], ValueDescr::Array(uint32()), ValueDescr::Scalar(uint32())})); + kernels[i] = static_cast(kernel); + } + return kernels; +} + +Result>> InitKernels( + const std::vector& kernels, ExecContext* ctx, + const std::vector& aggregates, const std::vector& in_descrs) { + std::vector> states(kernels.size()); + + for (size_t i = 0; i < aggregates.size(); ++i) { + auto options = aggregates[i].options; + + if (options == nullptr) { + // use known default options for the named function if possible + auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function); + if (maybe_function.ok()) { + options = maybe_function.ValueOrDie()->default_options(); + } + } + + KernelContext kernel_ctx{ctx}; + states[i] = kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i], + { + in_descrs[i].type, + uint32(), + uint32(), + }, + options}); + if (kernel_ctx.HasError()) return kernel_ctx.status(); + } + + return std::move(states); +} + +Result ResolveKernels( + const std::vector& aggregates, + const std::vector& kernels, + const std::vector>& states, ExecContext* ctx, + const std::vector& descrs) { + FieldVector fields(descrs.size()); + + for (size_t i = 0; i < kernels.size(); ++i) { + KernelContext kernel_ctx{ctx}; + kernel_ctx.SetState(states[i].get()); + + ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve( + &kernel_ctx, { + descrs[i].type, + uint32(), + uint32(), + })); + fields[i] = field(aggregates[i].function, std::move(descr.type)); + } + return fields; +} + +} // namespace + +Result> Grouper::Make(const std::vector& descrs, + ExecContext* ctx) { + return GrouperImpl::Make(descrs, ctx); +} + +Result GroupBy(const std::vector& arguments, const std::vector& keys, + const std::vector& aggregates, ExecContext* ctx) { + // Construct and initialize HashAggregateKernels + ARROW_ASSIGN_OR_RAISE(auto argument_descrs, + ExecBatch::Make(arguments).Map( + [](ExecBatch batch) { return batch.GetDescriptors(); })); + + ARROW_ASSIGN_OR_RAISE(auto kernels, GetKernels(ctx, aggregates, argument_descrs)); + + ARROW_ASSIGN_OR_RAISE(auto states, + InitKernels(kernels, ctx, aggregates, argument_descrs)); + + ARROW_ASSIGN_OR_RAISE( + FieldVector out_fields, + ResolveKernels(aggregates, kernels, states, ctx, argument_descrs)); + + using arrow::compute::detail::ExecBatchIterator; + + ARROW_ASSIGN_OR_RAISE(auto argument_batch_iterator, + ExecBatchIterator::Make(arguments, ctx->exec_chunksize())); + + // Construct Grouper + ARROW_ASSIGN_OR_RAISE(auto key_descrs, ExecBatch::Make(keys).Map([](ExecBatch batch) { + return batch.GetDescriptors(); + })); + + ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_descrs, ctx)); + + int i = 0; + for (ValueDescr& key_descr : key_descrs) { + out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type))); + } + + ARROW_ASSIGN_OR_RAISE(auto key_batch_iterator, + ExecBatchIterator::Make(keys, ctx->exec_chunksize())); + + // start "streaming" execution + ExecBatch key_batch, argument_batch; + while (argument_batch_iterator->Next(&argument_batch) && + key_batch_iterator->Next(&key_batch)) { + if (key_batch.length == 0) continue; + + // compute a batch of group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + + // consume group ids with HashAggregateKernels + for (size_t i = 0; i < kernels.size(); ++i) { + KernelContext batch_ctx{ctx}; + batch_ctx.SetState(states[i].get()); + ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch, + Datum(grouper->num_groups())})); + kernels[i]->consume(&batch_ctx, batch); + if (batch_ctx.HasError()) return batch_ctx.status(); + } + } + + // Finalize output + ArrayDataVector out_data(arguments.size() + keys.size()); + auto it = out_data.begin(); + + for (size_t i = 0; i < kernels.size(); ++i) { + KernelContext batch_ctx{ctx}; + batch_ctx.SetState(states[i].get()); + Datum out; + kernels[i]->finalize(&batch_ctx, &out); + if (batch_ctx.HasError()) return batch_ctx.status(); + *it++ = out.array(); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper->GetUniques()); + for (const auto& key : out_keys.values) { + *it++ = key.array(); + } + + int64_t length = out_data[0]->length; + return ArrayData::Make(struct_(std::move(out_fields)), length, + {/*null_bitmap=*/nullptr}, std::move(out_data), + /*null_count=*/0); +} + +Result> Grouper::ApplyGroupings(const ListArray& groupings, + const Array& array, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(Datum sorted, + compute::Take(array, groupings.data()->child_data[0], + TakeOptions::NoBoundsCheck(), ctx)); + + return std::make_shared(list(array.type()), groupings.length(), + groupings.value_offsets(), sorted.make_array()); +} + +Result> Grouper::MakeGroupings(const UInt32Array& ids, + uint32_t num_groups, + ExecContext* ctx) { + if (ids.null_count() != 0) { + return Status::Invalid("MakeGroupings with null ids"); + } + + ARROW_ASSIGN_OR_RAISE(auto offsets, AllocateBuffer(sizeof(int32_t) * (num_groups + 1), + ctx->memory_pool())); + auto raw_offsets = reinterpret_cast(offsets->mutable_data()); + + std::memset(raw_offsets, 0, offsets->size()); + for (int i = 0; i < ids.length(); ++i) { + DCHECK_LT(ids.Value(i), num_groups); + raw_offsets[ids.Value(i)] += 1; + } + int32_t length = 0; + for (uint32_t id = 0; id < num_groups; ++id) { + auto offset = raw_offsets[id]; + raw_offsets[id] = length; + length += offset; + } + raw_offsets[num_groups] = length; + DCHECK_EQ(ids.length(), length); + + ARROW_ASSIGN_OR_RAISE(auto offsets_copy, + offsets->CopySlice(0, offsets->size(), ctx->memory_pool())); + raw_offsets = reinterpret_cast(offsets_copy->mutable_data()); + + ARROW_ASSIGN_OR_RAISE(auto sort_indices, AllocateBuffer(sizeof(int32_t) * ids.length(), + ctx->memory_pool())); + auto raw_sort_indices = reinterpret_cast(sort_indices->mutable_data()); + for (int i = 0; i < ids.length(); ++i) { + raw_sort_indices[raw_offsets[ids.Value(i)]++] = i; + } + + return std::make_shared( + list(int32()), num_groups, std::move(offsets), + std::make_shared(ids.length(), std::move(sort_indices))); +} + +namespace { +const FunctionDoc hash_count_doc{"Count the number of null / non-null values", + ("By default, non-null values are counted.\n" + "This can be changed through CountOptions."), + {"array", "group_id_array", "group_count"}, + "CountOptions"}; + +const FunctionDoc hash_sum_doc{"Sum values of a numeric array", + ("Null values are ignored."), + {"array", "group_id_array", "group_count"}}; + +const FunctionDoc hash_min_max_doc{ + "Compute the minimum and maximum values of a numeric array", + ("Null values are ignored by default.\n" + "This can be changed through MinMaxOptions."), + {"array", "group_id_array", "group_count"}, + "MinMaxOptions"}; +} // namespace + +void RegisterHashAggregateBasic(FunctionRegistry* registry) { + { + static auto default_count_options = CountOptions::Defaults(); + auto func = std::make_shared( + "hash_count", Arity::Ternary(), &hash_count_doc, &default_count_options); + DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + auto func = std::make_shared("hash_sum", Arity::Ternary(), + &hash_sum_doc); + DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + static auto default_minmax_options = MinMaxOptions::Defaults(); + auto func = std::make_shared( + "hash_min_max", Arity::Ternary(), &hash_min_max_doc, &default_minmax_options); + DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } +} + +} // namespace internal +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc new file mode 100644 index 0000000000000..7858d8bb147d8 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -0,0 +1,703 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/array.h" +#include "arrow/chunked_array.h" +#include "arrow/compute/api_aggregate.h" +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/registry.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_common.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/bitmap_reader.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/int_util_internal.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" + +using testing::HasSubstr; + +namespace arrow { + +using internal::BitmapReader; +using internal::checked_cast; +using internal::checked_pointer_cast; + +namespace compute { +namespace { + +Result NaiveGroupBy(std::vector arguments, std::vector keys, + const std::vector& aggregates) { + ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys))); + + ARROW_ASSIGN_OR_RAISE(auto grouper, + internal::Grouper::Make(key_batch.GetDescriptors())); + + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + + ARROW_ASSIGN_OR_RAISE( + auto groupings, internal::Grouper::MakeGroupings(*id_batch.array_as(), + grouper->num_groups())); + + ArrayVector out_columns; + std::vector out_names; + + for (size_t i = 0; i < arguments.size(); ++i) { + out_names.push_back(aggregates[i].function); + + // trim "hash_" prefix + auto scalar_agg_function = aggregates[i].function.substr(5); + + ARROW_ASSIGN_OR_RAISE( + auto grouped_argument, + internal::Grouper::ApplyGroupings(*groupings, *arguments[i].make_array())); + + ScalarVector aggregated_scalars; + + for (int64_t i_group = 0; i_group < grouper->num_groups(); ++i_group) { + auto slice = grouped_argument->value_slice(i_group); + if (slice->length() == 0) continue; + ARROW_ASSIGN_OR_RAISE( + Datum d, CallFunction(scalar_agg_function, {slice}, aggregates[i].options)); + aggregated_scalars.push_back(d.scalar()); + } + + ARROW_ASSIGN_OR_RAISE(Datum aggregated_column, + ScalarVectorToArray(aggregated_scalars)); + out_columns.push_back(aggregated_column.make_array()); + } + + int i = 0; + ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques()); + for (const Datum& key : uniques.values) { + out_columns.push_back(key.make_array()); + out_names.push_back("key_" + std::to_string(i++)); + } + + return StructArray::Make(std::move(out_columns), std::move(out_names)); +} + +void ValidateGroupBy(const std::vector& aggregates, + std::vector arguments, std::vector keys) { + ASSERT_OK_AND_ASSIGN(Datum expected, NaiveGroupBy(arguments, keys, aggregates)); + + ASSERT_OK_AND_ASSIGN(Datum actual, GroupBy(arguments, keys, aggregates)); + + ASSERT_OK(expected.make_array()->ValidateFull()); + ASSERT_OK(actual.make_array()->ValidateFull()); + + AssertDatumsEqual(expected, actual, /*verbose=*/true); +} + +} // namespace + +TEST(Grouper, SupportedKeys) { + ASSERT_OK(internal::Grouper::Make({boolean()})); + + ASSERT_OK(internal::Grouper::Make({int8(), uint16(), int32(), uint64()})); + + ASSERT_OK(internal::Grouper::Make({dictionary(int64(), utf8())})); + + ASSERT_OK(internal::Grouper::Make({float16(), float32(), float64()})); + + ASSERT_OK(internal::Grouper::Make({utf8(), binary(), large_utf8(), large_binary()})); + + ASSERT_OK(internal::Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)})); + + ASSERT_OK(internal::Grouper::Make({decimal128(32, 10), decimal256(76, 20)})); + + ASSERT_OK(internal::Grouper::Make({date32(), date64()})); + + for (auto unit : { + TimeUnit::SECOND, + TimeUnit::MILLI, + TimeUnit::MICRO, + TimeUnit::NANO, + }) { + ASSERT_OK(internal::Grouper::Make({timestamp(unit), duration(unit)})); + } + + ASSERT_OK(internal::Grouper::Make({day_time_interval(), month_interval()})); + + ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({field("", int64())})})); + + ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({})})); + + ASSERT_RAISES(NotImplemented, internal::Grouper::Make({list(int32())})); + + ASSERT_RAISES(NotImplemented, internal::Grouper::Make({fixed_size_list(int32(), 5)})); + + ASSERT_RAISES(NotImplemented, + internal::Grouper::Make({dense_union({field("", int32())})})); +} + +struct TestGrouper { + explicit TestGrouper(std::vector descrs) : descrs_(std::move(descrs)) { + grouper_ = internal::Grouper::Make(descrs_).ValueOrDie(); + + FieldVector fields; + for (const auto& descr : descrs_) { + fields.push_back(field("", descr.type)); + } + key_schema_ = schema(std::move(fields)); + } + + void ExpectConsume(const std::string& key_json, const std::string& expected) { + ExpectConsume(ExecBatch(*RecordBatchFromJSON(key_schema_, key_json)), + ArrayFromJSON(uint32(), expected)); + } + + void ExpectConsume(const std::vector& key_batch, Datum expected) { + ExpectConsume(*ExecBatch::Make(key_batch), expected); + } + + void ExpectConsume(const ExecBatch& key_batch, Datum expected) { + Datum ids; + ConsumeAndValidate(key_batch, &ids); + AssertDatumsEqual(expected, ids, /*verbose=*/true); + } + + void ConsumeAndValidate(const ExecBatch& key_batch, Datum* ids = nullptr) { + ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(key_batch)); + + ValidateConsume(key_batch, id_batch); + + if (ids) { + *ids = std::move(id_batch); + } + } + + void ValidateConsume(const ExecBatch& key_batch, const Datum& id_batch) { + if (uniques_.length == -1) { + ASSERT_OK_AND_ASSIGN(uniques_, grouper_->GetUniques()); + } else if (static_cast(grouper_->num_groups()) > uniques_.length) { + ASSERT_OK_AND_ASSIGN(ExecBatch new_uniques, grouper_->GetUniques()); + + // check that uniques_ are prefixes of new_uniques + for (int i = 0; i < uniques_.num_values(); ++i) { + auto new_unique = new_uniques[i].make_array(); + ASSERT_OK(new_unique->ValidateFull()); + + AssertDatumsEqual(uniques_[i], new_unique->Slice(0, uniques_.length), + /*verbose=*/true); + } + + uniques_ = std::move(new_uniques); + } + + // check that the ids encode an equivalent key sequence + auto ids = id_batch.make_array(); + ASSERT_OK(ids->ValidateFull()); + + for (int i = 0; i < key_batch.num_values(); ++i) { + SCOPED_TRACE(std::to_string(i) + "th key array"); + auto original = key_batch[i].make_array(); + ASSERT_OK_AND_ASSIGN(auto encoded, Take(*uniques_[i].make_array(), *ids)); + AssertArraysEqual(*original, *encoded, /*verbose=*/true, + EqualOptions().nans_equal(true)); + } + } + + std::vector descrs_; + std::shared_ptr key_schema_; + std::unique_ptr grouper_; + ExecBatch uniques_ = ExecBatch({}, -1); +}; + +TEST(Grouper, BooleanKey) { + TestGrouper g({boolean()}); + + g.ExpectConsume("[[true], [true]]", "[0, 0]"); + + g.ExpectConsume("[[true], [true]]", "[0, 0]"); + + g.ExpectConsume("[[false], [null]]", "[1, 2]"); + + g.ExpectConsume("[[true], [false], [true], [false], [null], [false], [null]]", + "[0, 1, 0, 1, 2, 1, 2]"); +} + +TEST(Grouper, NumericKey) { + for (auto ty : { + uint8(), + int8(), + uint16(), + int16(), + uint32(), + int32(), + uint64(), + int64(), + float16(), + float32(), + float64(), + }) { + SCOPED_TRACE("key type: " + ty->ToString()); + + TestGrouper g({ty}); + + g.ExpectConsume("[[3], [3]]", "[0, 0]"); + + g.ExpectConsume("[[3], [3]]", "[0, 0]"); + + g.ExpectConsume("[[27], [81]]", "[1, 2]"); + + g.ExpectConsume("[[3], [27], [3], [27], [null], [81], [27], [81]]", + "[0, 1, 0, 1, 3, 2, 1, 2]"); + } +} + +TEST(Grouper, FloatingPointKey) { + TestGrouper g({float32()}); + + // -0.0 hashes differently from 0.0 + g.ExpectConsume("[[0.0], [-0.0]]", "[0, 1]"); + + g.ExpectConsume("[[Inf], [-Inf]]", "[2, 3]"); + + // assert(!(NaN == NaN)) does not cause spurious new groups + g.ExpectConsume("[[NaN], [NaN]]", "[4, 4]"); + + // TODO(bkietz) test denormal numbers, more NaNs +} + +TEST(Grouper, StringKey) { + for (auto ty : {utf8(), large_utf8(), fixed_size_binary(2)}) { + SCOPED_TRACE("key type: " + ty->ToString()); + + TestGrouper g({ty}); + + g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]"); + + g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]"); + + g.ExpectConsume(R"([["be"], [null]])", "[1, 2]"); + } +} + +TEST(Grouper, DictKey) { + TestGrouper g({dictionary(int32(), utf8())}); + + // For dictionary keys, all batches must share a single dictionary. + // Eventually, differing dictionaries will be unified and indices transposed + // during encoding to relieve this restriction. + const auto dict = ArrayFromJSON(utf8(), R"(["ex", "why", "zee", null])"); + + auto WithIndices = [&](const std::string& indices) { + return Datum(*DictionaryArray::FromArrays(ArrayFromJSON(int32(), indices), dict)); + }; + + // NB: null index is not considered equivalent to index=3 (which encodes null in dict) + g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")}, + ArrayFromJSON(uint32(), "[0, 1, 2, 3, 4]")); + + g = TestGrouper({dictionary(int32(), utf8())}); + + g.ExpectConsume({WithIndices(" [0, 1, 2, 3, null]")}, + ArrayFromJSON(uint32(), "[0, 1, 2, 3, 4]")); + + g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")}, + ArrayFromJSON(uint32(), "[3, 1, 4, 0, 2]")); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, HasSubstr("Unifying differing dictionaries"), + g.grouper_->Consume(*ExecBatch::Make({*DictionaryArray::FromArrays( + ArrayFromJSON(int32(), "[0, 1]"), + ArrayFromJSON(utf8(), R"(["different", "dictionary"])"))}))); +} + +TEST(Grouper, StringInt64Key) { + TestGrouper g({utf8(), int64()}); + + g.ExpectConsume(R"([["eh", 0], ["eh", 0]])", "[0, 0]"); + + g.ExpectConsume(R"([["eh", 0], ["eh", null]])", "[0, 1]"); + + g.ExpectConsume(R"([["eh", 1], ["bee", 1]])", "[2, 3]"); + + g.ExpectConsume(R"([["eh", null], ["bee", 1]])", "[1, 3]"); + + g = TestGrouper({utf8(), int64()}); + + g.ExpectConsume(R"([ + ["ex", 0], + ["ex", 0], + ["why", 0], + ["ex", 1], + ["why", 0], + ["ex", 1], + ["ex", 0], + ["why", 1] + ])", + "[0, 0, 1, 2, 1, 2, 0, 3]"); + + g.ExpectConsume(R"([ + ["ex", 0], + [null, 0], + [null, 0], + ["ex", 1], + [null, null], + ["ex", 1], + ["ex", 0], + ["why", null] + ])", + "[0, 4, 4, 2, 5, 2, 0, 6]"); +} + +TEST(Grouper, DoubleStringInt64Key) { + TestGrouper g({float64(), utf8(), int64()}); + + g.ExpectConsume(R"([[1.5, "eh", 0], [1.5, "eh", 0]])", "[0, 0]"); + + g.ExpectConsume(R"([[1.5, "eh", 0], [1.5, "eh", 0]])", "[0, 0]"); + + g.ExpectConsume(R"([[1.0, "eh", 0], [1.0, "be", null]])", "[1, 2]"); + + // note: -0 and +0 hash differently + g.ExpectConsume(R"([[-0.0, "be", 7], [0.0, "be", 7]])", "[3, 4]"); +} + +TEST(Grouper, RandomInt64Keys) { + TestGrouper g({int64()}); + for (int i = 0; i < 4; ++i) { + SCOPED_TRACE(std::to_string(i) + "th key batch"); + + ExecBatch key_batch{ + *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; + g.ConsumeAndValidate(key_batch); + } +} + +TEST(Grouper, RandomStringInt64Keys) { + TestGrouper g({utf8(), int64()}); + for (int i = 0; i < 4; ++i) { + SCOPED_TRACE(std::to_string(i) + "th key batch"); + + ExecBatch key_batch{ + *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; + g.ConsumeAndValidate(key_batch); + } +} + +TEST(Grouper, RandomStringInt64DoubleInt32Keys) { + TestGrouper g({utf8(), int64(), float64(), int32()}); + for (int i = 0; i < 4; ++i) { + SCOPED_TRACE(std::to_string(i) + "th key batch"); + + ExecBatch key_batch{ + *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; + g.ConsumeAndValidate(key_batch); + } +} + +TEST(Grouper, MakeGroupings) { + auto ExpectGroupings = [](std::string ids_json, std::string expected_json) { + auto ids = checked_pointer_cast(ArrayFromJSON(uint32(), ids_json)); + auto expected = ArrayFromJSON(list(int32()), expected_json); + + auto num_groups = static_cast(expected->length()); + ASSERT_OK_AND_ASSIGN(auto actual, internal::Grouper::MakeGroupings(*ids, num_groups)); + AssertArraysEqual(*expected, *actual, /*verbose=*/true); + + // validate ApplyGroupings + ASSERT_OK_AND_ASSIGN(auto grouped_ids, + internal::Grouper::ApplyGroupings(*actual, *ids)); + + for (uint32_t group = 0; group < num_groups; ++group) { + auto ids_slice = checked_pointer_cast(grouped_ids->value_slice(group)); + for (auto slot : *ids_slice) { + EXPECT_EQ(slot, group); + } + } + }; + + ExpectGroupings("[]", "[[]]"); + + ExpectGroupings("[0, 0, 0]", "[[0, 1, 2]]"); + + ExpectGroupings("[0, 0, 0, 1, 1, 2]", "[[0, 1, 2], [3, 4], [5], []]"); + + ExpectGroupings("[2, 1, 2, 1, 1, 2]", "[[], [1, 3, 4], [0, 2, 5], [], []]"); + + ExpectGroupings("[2, 2, 5, 5, 2, 3]", "[[], [], [0, 1, 4], [5], [], [2, 3], [], []]"); + + auto ids = checked_pointer_cast(ArrayFromJSON(uint32(), "[0, null, 1]")); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("MakeGroupings with null ids"), + internal::Grouper::MakeGroupings(*ids, 5)); +} + +TEST(GroupBy, Errors) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("group_id", uint32())}), R"([ + [1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, 0], + [3.25, 1], + [0.125, 2], + [-0.25, 2], + [0.75, 0], + [null, 3] + ])"); + + EXPECT_RAISES_WITH_MESSAGE_THAT( + NotImplemented, HasSubstr("Direct execution of HASH_AGGREGATE functions"), + CallFunction("hash_sum", {batch->GetColumnByName("argument"), + batch->GetColumnByName("group_id"), Datum(uint32_t(4))})); +} + +TEST(GroupBy, SumOnly) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", int64())}), R"([ + [1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2], + [-0.25, 2], + [0.75, null], + [null, 3] + ])"); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy({batch->GetColumnByName("argument")}, + {batch->GetColumnByName("key")}, + { + {"hash_sum", nullptr}, + })); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_sum", float64()), + field("key_0", int64()), + }), + R"([ + [4.25, 1], + [-0.125, 2], + [null, 3], + [4.75, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); +} + +TEST(GroupBy, MinMaxOnly) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", int64())}), R"([ + [1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2], + [-0.25, 2], + [0.75, null], + [null, 3] + ])"); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy({batch->GetColumnByName("argument")}, + {batch->GetColumnByName("key")}, + { + {"hash_min_max", nullptr}, + })); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_min_max", struct_({ + field("min", float64()), + field("max", float64()), + })), + field("key_0", int64()), + }), + R"([ + [{"min": 1.0, "max": 3.25}, 1], + [{"min": -0.25, "max": 0.125}, 2], + [{"min": null, "max": null}, 3], + [{"min": 0.75, "max": 4.0}, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); +} + +TEST(GroupBy, CountAndSum) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", int64())}), R"([ + [1.0, 1], + [null, 1], + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2], + [-0.25, 2], + [0.75, null], + [null, 3] + ])"); + + CountOptions count_options; + ASSERT_OK_AND_ASSIGN( + Datum aggregated_and_grouped, + internal::GroupBy( + { + // NB: passing an argument twice or also using it as a key is legal + batch->GetColumnByName("argument"), + batch->GetColumnByName("argument"), + batch->GetColumnByName("key"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_count", &count_options}, + {"hash_sum", nullptr}, + {"hash_sum", nullptr}, + })); + + AssertDatumsEqual( + ArrayFromJSON(struct_({ + field("hash_count", int64()), + // NB: summing a float32 array results in float64 sums + field("hash_sum", float64()), + field("hash_sum", int64()), + field("key_0", int64()), + }), + R"([ + [2, 4.25, 3, 1], + [3, -0.125, 6, 2], + [0, null, 6, 3], + [2, 4.75, null, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); +} + +TEST(GroupBy, SumOnlyStringAndDictKeys) { + for (auto key_type : {utf8(), dictionary(int32(), utf8())}) { + SCOPED_TRACE("key type: " + key_type->ToString()); + + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", key_type)}), R"([ + [1.0, "alfa"], + [null, "alfa"], + [0.0, "beta"], + [null, "gama"], + [4.0, null ], + [3.25, "alfa"], + [0.125, "beta"], + [-0.25, "beta"], + [0.75, null ], + [null, "gama"] + ])"); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + internal::GroupBy({batch->GetColumnByName("argument")}, + {batch->GetColumnByName("key")}, + { + {"hash_sum", nullptr}, + })); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_sum", float64()), + field("key_0", key_type), + }), + R"([ + [4.25, "alfa"], + [-0.125, "beta"], + [null, "gama"], + [4.75, null ] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + } +} + +TEST(GroupBy, ConcreteCaseWithValidateGroupBy) { + auto batch = RecordBatchFromJSON( + schema({field("argument", float64()), field("key", utf8())}), R"([ + [1.0, "alfa"], + [null, "alfa"], + [0.0, "beta"], + [null, "gama"], + [4.0, null ], + [3.25, "alfa"], + [0.125, "beta"], + [-0.25, "beta"], + [0.75, null ], + [null, "gama"] + ])"); + + CountOptions count_non_null{CountOptions::COUNT_NON_NULL}, + count_null{CountOptions::COUNT_NULL}; + + MinMaxOptions emit_null{MinMaxOptions::EMIT_NULL}; + + using internal::Aggregate; + for (auto agg : { + Aggregate{"hash_sum", nullptr}, + Aggregate{"hash_count", &count_non_null}, + Aggregate{"hash_count", &count_null}, + Aggregate{"hash_min_max", nullptr}, + Aggregate{"hash_min_max", &emit_null}, + }) { + SCOPED_TRACE(agg.function); + ValidateGroupBy({agg}, {batch->GetColumnByName("argument")}, + {batch->GetColumnByName("key")}); + } +} + +TEST(GroupBy, RandomArraySum) { + for (int64_t length : {1 << 10, 1 << 12, 1 << 15}) { + for (auto null_probability : {0.0, 0.01, 0.5, 1.0}) { + auto batch = random::GenerateBatch( + { + field("argument", float32(), + key_value_metadata( + {{"null_probability", std::to_string(null_probability)}})), + field("key", int64(), key_value_metadata({{"min", "0"}, {"max", "100"}})), + }, + length, 0xDEADBEEF); + + ValidateGroupBy( + { + {"hash_sum", nullptr}, + }, + {batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")}); + } + } +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index 9385c5c2a16ee..3a8a3a0eb8530 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -126,18 +126,19 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarValidity(registry.get()); RegisterScalarFillNull(registry.get()); + // Vector functions + RegisterVectorHash(registry.get()); + RegisterVectorSelection(registry.get()); + RegisterVectorNested(registry.get()); + RegisterVectorSort(registry.get()); + // Aggregate functions RegisterScalarAggregateBasic(registry.get()); RegisterScalarAggregateMode(registry.get()); RegisterScalarAggregateQuantile(registry.get()); RegisterScalarAggregateTDigest(registry.get()); RegisterScalarAggregateVariance(registry.get()); - - // Vector functions - RegisterVectorHash(registry.get()); - RegisterVectorSelection(registry.get()); - RegisterVectorNested(registry.get()); - RegisterVectorSort(registry.get()); + RegisterHashAggregateBasic(registry.get()); return registry; } diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 3b0f4475328b4..e4008cf3f270f 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -47,6 +47,7 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry); void RegisterScalarAggregateQuantile(FunctionRegistry* registry); void RegisterScalarAggregateTDigest(FunctionRegistry* registry); void RegisterScalarAggregateVariance(FunctionRegistry* registry); +void RegisterHashAggregateBasic(FunctionRegistry* registry); } // namespace internal } // namespace compute diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/dataset/expression.cc index 6e71aa17e7427..627477b3038c7 100644 --- a/cpp/src/arrow/dataset/expression.cc +++ b/cpp/src/arrow/dataset/expression.cc @@ -284,7 +284,7 @@ bool Identical(const Expression& l, const Expression& r) { return l.impl_ == r.i size_t Expression::hash() const { if (auto lit = literal()) { if (lit->is_scalar()) { - return Scalar::Hash::hash(*lit->scalar()); + return lit->scalar()->hash(); } return 0; } diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index ec4a28c8a0e98..43ccd777cf2e2 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -26,6 +26,7 @@ #include "arrow/array/array_dict.h" #include "arrow/array/array_nested.h" #include "arrow/array/builder_dict.h" +#include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" @@ -70,56 +71,89 @@ std::shared_ptr Partitioning::Default() { return std::make_shared(); } -inline Expression ConjunctionFromGroupingRow(Scalar* row) { - ScalarVector* values = &checked_cast(row)->value; - std::vector equality_expressions(values->size()); - for (size_t i = 0; i < values->size(); ++i) { - const std::string& name = row->type->field(static_cast(i))->name(); - if (values->at(i)->is_valid) { - equality_expressions[i] = equal(field_ref(name), literal(std::move(values->at(i)))); - } else { - equality_expressions[i] = is_null(field_ref(name)); - } +static Result ApplyGroupings( + const ListArray& groupings, const std::shared_ptr& batch) { + ARROW_ASSIGN_OR_RAISE(Datum sorted, + compute::Take(batch, groupings.data()->child_data[0])); + + const auto& sorted_batch = *sorted.record_batch(); + + RecordBatchVector out(static_cast(groupings.length())); + for (size_t i = 0; i < out.size(); ++i) { + out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i)); } - return and_(std::move(equality_expressions)); + + return out; } Result KeyValuePartitioning::Partition( const std::shared_ptr& batch) const { - FieldVector by_fields; - ArrayVector by_columns; + std::vector key_indices; + int num_keys = 0; - std::shared_ptr rest = batch; + // assemble vector of indices of fields in batch on which we'll partition for (const auto& partition_field : schema_->fields()) { ARROW_ASSIGN_OR_RAISE( - auto match, FieldRef(partition_field->name()).FindOneOrNone(*rest->schema())) - if (match.empty()) continue; + auto match, FieldRef(partition_field->name()).FindOneOrNone(*batch->schema())) - by_fields.push_back(partition_field); - by_columns.push_back(rest->column(match[0])); - ARROW_ASSIGN_OR_RAISE(rest, rest->RemoveColumn(match[0])); + if (match.empty()) continue; + key_indices.push_back(match[0]); + ++num_keys; } - if (by_fields.empty()) { + if (key_indices.empty()) { // no fields to group by; return the whole batch return PartitionedBatches{{batch}, {literal(true)}}; } - ARROW_ASSIGN_OR_RAISE(auto by, - StructArray::Make(std::move(by_columns), std::move(by_fields))); - ARROW_ASSIGN_OR_RAISE(auto groupings_and_values, MakeGroupings(*by)); - auto groupings = - checked_pointer_cast(groupings_and_values->GetFieldByName("groupings")); - auto unique_rows = groupings_and_values->GetFieldByName("values"); + // assemble an ExecBatch of the key columns + compute::ExecBatch key_batch({}, batch->num_rows()); + for (int i : key_indices) { + key_batch.values.emplace_back(batch->column_data(i)); + } + + ARROW_ASSIGN_OR_RAISE(auto grouper, + compute::internal::Grouper::Make(key_batch.GetDescriptors())); + + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + + auto ids = id_batch.array_as(); + ARROW_ASSIGN_OR_RAISE(auto groupings, compute::internal::Grouper::MakeGroupings( + *ids, grouper->num_groups())); + + ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques()); + ArrayVector unique_arrays(num_keys); + for (int i = 0; i < num_keys; ++i) { + unique_arrays[i] = uniques.values[i].make_array(); + } PartitionedBatches out; - ARROW_ASSIGN_OR_RAISE(out.batches, ApplyGroupings(*groupings, rest)); - out.expressions.resize(out.batches.size()); - for (size_t i = 0; i < out.batches.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto row, unique_rows->GetScalar(i)); - out.expressions[i] = ConjunctionFromGroupingRow(row.get()); + // assemble partition expressions from the unique keys + out.expressions.resize(grouper->num_groups()); + for (uint32_t group = 0; group < grouper->num_groups(); ++group) { + std::vector exprs(num_keys); + + for (int i = 0; i < num_keys; ++i) { + ARROW_ASSIGN_OR_RAISE(auto val, unique_arrays[i]->GetScalar(group)); + const auto& name = batch->schema()->field(key_indices[i])->name(); + + exprs[i] = val->is_valid ? equal(field_ref(name), literal(std::move(val))) + : is_null(field_ref(name)); + } + out.expressions[group] = and_(std::move(exprs)); + } + + // remove key columns from batch to which we'll be applying the groupings + auto rest = batch; + std::sort(key_indices.begin(), key_indices.end(), std::greater()); + for (int i : key_indices) { + // indices are in descending order; indices larger than i (which would be invalidated + // here) have already been handled + ARROW_ASSIGN_OR_RAISE(rest, rest->RemoveColumn(i)); } + ARROW_ASSIGN_OR_RAISE(out.batches, ApplyGroupings(*groupings, rest)); + return out; } @@ -272,10 +306,11 @@ Result DirectoryPartitioning::FormatValues( return fs::internal::JoinAbstractPath(std::move(segments)); } +namespace { class KeyValuePartitioningFactory : public PartitioningFactory { protected: explicit KeyValuePartitioningFactory(PartitioningFactoryOptions options) - : options_(options) {} + : options_(std::move(options)) {} int GetOrInsertField(const std::string& name) { auto it_inserted = @@ -436,6 +471,8 @@ class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { std::vector field_names_; }; +} // namespace + std::shared_ptr DirectoryPartitioning::MakeFactory( std::vector field_names, PartitioningFactoryOptions options) { return std::shared_ptr( @@ -576,243 +613,5 @@ Result> PartitioningOrFactory::GetOrInferSchema( return factory()->Inspect(paths); } -// Transform an array of counts to offsets which will divide a ListArray -// into an equal number of slices with corresponding lengths. -inline Result> CountsToOffsets( - std::shared_ptr counts) { - TypedBufferBuilder offset_builder; - RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1)); - - int32_t current_offset = 0; - offset_builder.UnsafeAppend(current_offset); - - for (int64_t i = 0; i < counts->length(); ++i) { - DCHECK_NE(counts->Value(i), 0); - current_offset += static_cast(counts->Value(i)); - offset_builder.UnsafeAppend(current_offset); - } - - std::shared_ptr offsets; - RETURN_NOT_OK(offset_builder.Finish(&offsets)); - return offsets; -} - -// Helper for simultaneous dictionary encoding of multiple arrays. -// -// The fused dictionary is the Cartesian product of the individual dictionaries. -// For example given two arrays A, B where A has unique values ["ex", "why"] -// and B has unique values [0, 1] the fused dictionary is the set of tuples -// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]]. -// -// TODO(bkietz) this capability belongs in an Action of the hash kernels, where -// it can be used to group aggregates without materializing a grouped batch. -// For the purposes of writing we need the materialized grouped batch anyway -// since no Writers accept a selection vector. -class StructDictionary { - public: - struct Encoded { - std::shared_ptr indices; - std::shared_ptr dictionary; - }; - - static Result Encode(const ArrayVector& columns) { - Encoded out{nullptr, std::make_shared()}; - - for (const auto& column : columns) { - RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices)); - } - - return out; - } - - Result> Decode(std::shared_ptr fused_indices, - FieldVector fields) { - std::vector builders(dictionaries_.size()); - for (Int32Builder& b : builders) { - RETURN_NOT_OK(b.Resize(fused_indices->length())); - } - - std::vector codes(dictionaries_.size()); - for (int64_t i = 0; i < fused_indices->length(); ++i) { - Expand(fused_indices->Value(i), codes.data()); - - auto builder_it = builders.begin(); - for (int32_t index : codes) { - builder_it++->UnsafeAppend(index); - } - } - - ArrayVector columns(dictionaries_.size()); - for (size_t i = 0; i < dictionaries_.size(); ++i) { - std::shared_ptr indices; - RETURN_NOT_OK(builders[i].FinishInternal(&indices)); - - ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices)); - - if (fields[i]->type()->id() == Type::DICTIONARY) { - RETURN_NOT_OK(RestoreDictionaryEncoding( - checked_pointer_cast(fields[i]->type()), &column)); - } - - columns[i] = column.make_array(); - } - - return StructArray::Make(std::move(columns), std::move(fields)); - } - - private: - Status AddOne(Datum column, std::shared_ptr* fused_indices) { - if (column.type()->id() == Type::DICTIONARY) { - if (column.null_count() != 0) { - // TODO(ARROW-11732) Optimize this by allowign DictionaryEncode to transfer a - // null-masked dictionary to a null-encoded dictionary. At the moment we decode - // and then encode causing one extra copy, and a potentially expansive decoding - // copy at that. - ARROW_ASSIGN_OR_RAISE( - auto decoded_dictionary, - compute::Cast( - column, - std::static_pointer_cast(column.type())->value_type(), - compute::CastOptions())); - column = decoded_dictionary; - } - } - if (column.type()->id() != Type::DICTIONARY) { - compute::DictionaryEncodeOptions options; - options.null_encoding_behavior = - compute::DictionaryEncodeOptions::NullEncodingBehavior::ENCODE; - ARROW_ASSIGN_OR_RAISE(column, - compute::DictionaryEncode(std::move(column), options)); - } - - auto dict_column = column.array_as(); - dictionaries_.push_back(dict_column->dictionary()); - ARROW_ASSIGN_OR_RAISE(auto indices, compute::Cast(*dict_column->indices(), int32())); - - if (*fused_indices == nullptr) { - *fused_indices = checked_pointer_cast(std::move(indices)); - return IncreaseSize(); - } - - // It's useful to think about the case where each of dictionaries_ has size 10. - // In this case the decimal digit in the ones place is the code in dictionaries_[0], - // the tens place corresponds to the code in dictionaries_[1], etc. - // The incumbent indices must be shifted to the hundreds place so as not to collide. - ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices, - compute::Multiply(indices, MakeScalar(size_))); - - ARROW_ASSIGN_OR_RAISE(new_fused_indices, - compute::Add(new_fused_indices, *fused_indices)); - - *fused_indices = checked_pointer_cast(new_fused_indices.make_array()); - return IncreaseSize(); - } - - // expand a fused code into component dict codes, order is in order of addition - void Expand(int32_t fused_code, int32_t* codes) { - for (size_t i = 0; i < dictionaries_.size(); ++i) { - auto dictionary_size = static_cast(dictionaries_[i]->length()); - codes[i] = fused_code % dictionary_size; - fused_code /= dictionary_size; - } - } - - Status RestoreDictionaryEncoding(std::shared_ptr expected_type, - Datum* column) { - DCHECK_NE(column->type()->id(), Type::DICTIONARY); - ARROW_ASSIGN_OR_RAISE(*column, compute::DictionaryEncode(std::move(*column))); - - if (expected_type->index_type()->id() == Type::INT32) { - // dictionary_encode has already yielded the expected index_type - return Status::OK(); - } - - // cast the indices to the expected index type - auto dictionary = std::move(column->mutable_array()->dictionary); - column->mutable_array()->type = int32(); - - ARROW_ASSIGN_OR_RAISE(*column, - compute::Cast(std::move(*column), expected_type->index_type())); - - column->mutable_array()->dictionary = std::move(dictionary); - column->mutable_array()->type = expected_type; - return Status::OK(); - } - - Status IncreaseSize() { - auto factor = static_cast(dictionaries_.back()->length()); - - if (internal::MultiplyWithOverflow(size_, factor, &size_)) { - return Status::CapacityError("Max groups exceeded"); - } - return Status::OK(); - } - - int32_t size_ = 1; - ArrayVector dictionaries_; -}; - -Result> MakeGroupings(const StructArray& by) { - if (by.num_fields() == 0) { - return Status::Invalid("Grouping with no criteria"); - } - - if (by.null_count() != 0) { - return Status::Invalid("Grouping with null criteria"); - } - - ARROW_ASSIGN_OR_RAISE(auto fused, StructDictionary::Encode(by.fields())); - - ARROW_ASSIGN_OR_RAISE(auto sort_indices, compute::SortIndices(*fused.indices)); - ARROW_ASSIGN_OR_RAISE(Datum sorted, compute::Take(fused.indices, *sort_indices)); - fused.indices = checked_pointer_cast(sorted.make_array()); - - ARROW_ASSIGN_OR_RAISE(auto fused_counts_and_values, - compute::ValueCounts(fused.indices)); - fused.indices.reset(); - - auto unique_fused_indices = - checked_pointer_cast(fused_counts_and_values->GetFieldByName("values")); - ARROW_ASSIGN_OR_RAISE( - auto unique_rows, - fused.dictionary->Decode(std::move(unique_fused_indices), by.type()->fields())); - - auto counts = - checked_pointer_cast(fused_counts_and_values->GetFieldByName("counts")); - ARROW_ASSIGN_OR_RAISE(auto offsets, CountsToOffsets(std::move(counts))); - - auto grouped_sort_indices = - std::make_shared(list(sort_indices->type()), unique_rows->length(), - std::move(offsets), std::move(sort_indices)); - - return StructArray::Make( - ArrayVector{std::move(unique_rows), std::move(grouped_sort_indices)}, - std::vector{"values", "groupings"}); -} - -Result> ApplyGroupings(const ListArray& groupings, - const Array& array) { - ARROW_ASSIGN_OR_RAISE(Datum sorted, - compute::Take(array, groupings.data()->child_data[0])); - - return std::make_shared(list(array.type()), groupings.length(), - groupings.value_offsets(), sorted.make_array()); -} - -Result ApplyGroupings(const ListArray& groupings, - const std::shared_ptr& batch) { - ARROW_ASSIGN_OR_RAISE(Datum sorted, - compute::Take(batch, groupings.data()->child_data[0])); - - const auto& sorted_batch = *sorted.record_batch(); - - RecordBatchVector out(static_cast(groupings.length())); - for (size_t i = 0; i < out.size(); ++i) { - out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i)); - } - - return out; -} - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index c49ac5e923e06..74e6c60710667 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -200,7 +200,7 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}, std::string null_fallback = kDefaultHiveNullFallback) : KeyValuePartitioning(std::move(schema), std::move(dictionaries)), - null_fallback_(null_fallback) {} + null_fallback_(std::move(null_fallback)) {} std::string type_name() const override { return "hive"; } std::string null_fallback() const { return null_fallback_; } @@ -299,64 +299,5 @@ class ARROW_DS_EXPORT PartitioningOrFactory { std::shared_ptr partitioning_; }; -/// \brief Assemble lists of indices of identical rows. -/// -/// \param[in] by A StructArray whose columns will be used as grouping criteria. -/// Top level nulls are invalid, as are empty criteria (no grouping -/// columns). -/// \return A array of type `struct>`, -/// which is a mapping from unique rows (field "values") to lists of -/// indices into `by` where that row appears (field "groupings"). -/// -/// For example, -/// MakeGroupings([ -/// {"a": "ex", "b": 0}, -/// {"a": "ex", "b": 0}, -/// {"a": "why", "b": 0}, -/// {"a": "why", "b": 0}, -/// {"a": "ex", "b": 0}, -/// {"a": "why", "b": 1} -/// ]) == [ -/// {"values": {"a": "ex", "b": 0}, "groupings": [0, 1, 4]}, -/// {"values": {"a": "why", "b": 0}, "groupings": [2, 3]}, -/// {"values": {"a": "why", "b": 1}, "groupings": [5]} -/// ] -ARROW_DS_EXPORT -Result> MakeGroupings(const StructArray& by); - -/// \brief Produce a ListArray whose slots are selections of `array` which correspond to -/// the provided groupings. -/// -/// For example, -/// ApplyGroupings([[0, 1, 4], [2, 3], [5]], [ -/// {"a": "ex", "b": 0, "passenger": 0}, -/// {"a": "ex", "b": 0, "passenger": 1}, -/// {"a": "why", "b": 0, "passenger": 2}, -/// {"a": "why", "b": 0, "passenger": 3}, -/// {"a": "ex", "b": 0, "passenger": 4}, -/// {"a": "why", "b": 1, "passenger": 5} -/// ]) == [ -/// [ -/// {"a": "ex", "b": 0, "passenger": 0}, -/// {"a": "ex", "b": 0, "passenger": 1}, -/// {"a": "ex", "b": 0, "passenger": 4}, -/// ], -/// [ -/// {"a": "why", "b": 0, "passenger": 2}, -/// {"a": "why", "b": 0, "passenger": 3}, -/// ], -/// [ -/// {"a": "why", "b": 1, "passenger": 5} -/// ] -/// ] -ARROW_DS_EXPORT -Result> ApplyGroupings(const ListArray& groupings, - const Array& array); - -/// \brief Produce selections of a RecordBatch which correspond to the provided groupings. -ARROW_DS_EXPORT -Result ApplyGroupings(const ListArray& groupings, - const std::shared_ptr& batch); - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 456b285231186..06c3cc676742a 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -138,9 +138,13 @@ class TestPartitioning : public ::testing::Test { }; TEST_F(TestPartitioning, Partition) { + auto dataset_schema = + schema({field("a", int32()), field("b", utf8()), field("c", uint32())}); + auto partition_schema = schema({field("a", int32()), field("b", utf8())}); - auto schema_ = schema({field("a", int32()), field("b", utf8()), field("c", uint32())}); - auto remaining_schema = schema({field("c", uint32())}); + + auto physical_schema = schema({field("c", uint32())}); + auto partitioning = std::make_shared(partition_schema); std::string json = R"([{"a": 3, "b": "x", "c": 0}, {"a": 3, "b": "x", "c": 1}, @@ -149,15 +153,22 @@ TEST_F(TestPartitioning, Partition) { {"a": null, "b": "z", "c": 4}, {"a": null, "b": null, "c": 5} ])"; - std::vector expected_batches = {R"([{"c": 0}, {"c": 1}])", R"([{"c": 2}])", - R"([{"c": 3}, {"c": 5}])", - R"([{"c": 4}])"}; + + std::vector expected_batches = { + R"([{"c": 0}, {"c": 1}])", + R"([{"c": 2}])", + R"([{"c": 3}, {"c": 5}])", + R"([{"c": 4}])", + }; + std::vector expected_expressions = { and_(equal(field_ref("a"), literal(3)), equal(field_ref("b"), literal("x"))), and_(equal(field_ref("a"), literal(1)), is_null(field_ref("b"))), and_(is_null(field_ref("a")), is_null(field_ref("b"))), - and_(is_null(field_ref("a")), equal(field_ref("b"), literal("z")))}; - AssertPartition(partitioning, schema_, json, remaining_schema, expected_batches, + and_(is_null(field_ref("a")), equal(field_ref("b"), literal("z"))), + }; + + AssertPartition(partitioning, dataset_schema, json, physical_schema, expected_batches, expected_expressions); } @@ -713,132 +724,5 @@ TEST(TestStripPrefixAndFilename, Basic) { "year=2019/month=12/day=01")); } -void AssertGrouping(const FieldVector& by_fields, const std::string& batch_json, - const std::string& expected_json) { - FieldVector fields_with_ids = by_fields; - fields_with_ids.push_back(field("ids", list(int32()))); - auto expected = ArrayFromJSON(struct_(fields_with_ids), expected_json); - - FieldVector fields_with_id = by_fields; - fields_with_id.push_back(field("id", int32())); - auto batch = RecordBatchFromJSON(schema(fields_with_id), batch_json); - - ASSERT_OK_AND_ASSIGN(auto by, batch->RemoveColumn(batch->num_columns() - 1) - .Map([](std::shared_ptr by) { - return by->ToStructArray(); - })); - - ASSERT_OK_AND_ASSIGN(auto groupings_and_values, MakeGroupings(*by)); - ASSERT_OK(groupings_and_values->ValidateFull()); - - auto groupings = - checked_pointer_cast(groupings_and_values->GetFieldByName("groupings")); - - ASSERT_OK_AND_ASSIGN(std::shared_ptr grouped_ids, - ApplyGroupings(*groupings, *batch->GetColumnByName("id"))); - ASSERT_OK(grouped_ids->ValidateFull()); - - ArrayVector columns = - checked_cast(*groupings_and_values->GetFieldByName("values")) - .fields(); - columns.push_back(grouped_ids); - - ASSERT_OK_AND_ASSIGN(auto actual, StructArray::Make(columns, fields_with_ids)); - ASSERT_OK(actual->ValidateFull()); - - AssertArraysEqual(*expected, *actual, /*verbose=*/true); -} - -TEST(GroupTest, Basics) { - AssertGrouping({field("a", utf8()), field("b", int32())}, R"([ - {"a": "ex", "b": 0, "id": 0}, - {"a": "ex", "b": 0, "id": 1}, - {"a": "why", "b": 0, "id": 2}, - {"a": "ex", "b": 1, "id": 3}, - {"a": "why", "b": 0, "id": 4}, - {"a": "ex", "b": 1, "id": 5}, - {"a": "ex", "b": 0, "id": 6}, - {"a": "why", "b": 1, "id": 7} - ])", - R"([ - {"a": "ex", "b": 0, "ids": [0, 1, 6]}, - {"a": "why", "b": 0, "ids": [2, 4]}, - {"a": "ex", "b": 1, "ids": [3, 5]}, - {"a": "why", "b": 1, "ids": [7]} - ])"); -} - -TEST(GroupTest, WithNulls) { - AssertGrouping({field("a", utf8()), field("b", int32())}, - R"([ - {"a": "ex", "b": 0, "id": 0}, - {"a": null, "b": 0, "id": 1}, - {"a": null, "b": 0, "id": 2}, - {"a": "ex", "b": 1, "id": 3}, - {"a": null, "b": null, "id": 4}, - {"a": "ex", "b": 1, "id": 5}, - {"a": "ex", "b": 0, "id": 6}, - {"a": "why", "b": null, "id": 7} - ])", - R"([ - {"a": "ex", "b": 0, "ids": [0, 6]}, - {"a": null, "b": 0, "ids": [1, 2]}, - {"a": "ex", "b": 1, "ids": [3, 5]}, - {"a": null, "b": null, "ids": [4]}, - {"a": "why", "b": null, "ids": [7]} - ])"); - - AssertGrouping({field("a", dictionary(int32(), utf8())), field("b", int32())}, - R"([ - {"a": "ex", "b": 0, "id": 0}, - {"a": null, "b": 0, "id": 1}, - {"a": null, "b": 0, "id": 2}, - {"a": "ex", "b": 1, "id": 3}, - {"a": null, "b": null, "id": 4}, - {"a": "ex", "b": 1, "id": 5}, - {"a": "ex", "b": 0, "id": 6}, - {"a": "why", "b": null, "id": 7} - ])", - R"([ - {"a": "ex", "b": 0, "ids": [0, 6]}, - {"a": null, "b": 0, "ids": [1, 2]}, - {"a": "ex", "b": 1, "ids": [3, 5]}, - {"a": null, "b": null, "ids": [4]}, - {"a": "why", "b": null, "ids": [7]} - ])"); - - auto has_nulls = checked_pointer_cast( - ArrayFromJSON(struct_({field("a", utf8()), field("b", int32())}), R"([ - {"a": "ex", "b": 0}, - null, - {"a": "why", "b": 0}, - {"a": "ex", "b": 1}, - {"a": "why", "b": 0}, - {"a": "ex", "b": 1}, - {"a": "ex", "b": 0}, - null - ])")); - ASSERT_RAISES(Invalid, MakeGroupings(*has_nulls)); -} - -TEST(GroupTest, GroupOnDictionary) { - AssertGrouping({field("a", dictionary(int32(), utf8())), field("b", int32())}, R"([ - {"a": "ex", "b": 0, "id": 0}, - {"a": "ex", "b": 0, "id": 1}, - {"a": "why", "b": 0, "id": 2}, - {"a": "ex", "b": 1, "id": 3}, - {"a": "why", "b": 0, "id": 4}, - {"a": "ex", "b": 1, "id": 5}, - {"a": "ex", "b": 0, "id": 6}, - {"a": "why", "b": 1, "id": 7} - ])", - R"([ - {"a": "ex", "b": 0, "ids": [0, 1, 6]}, - {"a": "why", "b": 0, "ids": [2, 4]}, - {"a": "ex", "b": 1, "ids": [3, 5]}, - {"a": "why", "b": 1, "ids": [7]} - ])"); -} - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 95229eb78d560..4650e8063608a 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -87,7 +87,7 @@ class ARROW_EXPORT RecordBatch { // \return the table's schema /// \return true if batches are equal - std::shared_ptr schema() const { return schema_; } + const std::shared_ptr& schema() const { return schema_; } /// \brief Retrieve all columns at once std::vector> columns() const; diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc index 399eac675f4df..56a36114e49f7 100644 --- a/cpp/src/arrow/scalar.cc +++ b/cpp/src/arrow/scalar.cc @@ -145,7 +145,7 @@ struct ScalarHashImpl { size_t hash_; }; -size_t Scalar::Hash::hash(const Scalar& scalar) { return ScalarHashImpl(scalar).hash_; } +size_t Scalar::hash() const { return ScalarHashImpl(*this).hash_; } StringScalar::StringScalar(std::string s) : StringScalar(Buffer::FromString(std::move(s))) {} diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index e84e3fab90061..2474485968633 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -69,15 +69,15 @@ struct ARROW_EXPORT Scalar : public util::EqualityComparable { const EqualOptions& options = EqualOptions::Defaults()) const; struct ARROW_EXPORT Hash { - size_t operator()(const Scalar& scalar) const { return hash(scalar); } + size_t operator()(const Scalar& scalar) const { return scalar.hash(); } size_t operator()(const std::shared_ptr& scalar) const { - return hash(*scalar); + return scalar->hash(); } - - static size_t hash(const Scalar& scalar); }; + size_t hash() const; + std::string ToString() const; static Result> Parse(const std::shared_ptr& type, diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 41c1f752160f3..71fad394d0014 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -95,4 +95,88 @@ std::shared_ptr ConstantArrayGenerator::String(int64_t size, return ConstantArray(size, value); } +struct ScalarVectorToArrayImpl { + template ::BuilderType, + typename ScalarType = typename TypeTraits::ScalarType> + Status UseBuilder(const AppendScalar& append) { + BuilderType builder(type_, default_memory_pool()); + for (const auto& s : scalars_) { + if (s->is_valid) { + RETURN_NOT_OK(append(internal::checked_cast(*s), &builder)); + } else { + RETURN_NOT_OK(builder.AppendNull()); + } + } + return builder.FinishInternal(&data_); + } + + struct AppendValue { + template + Status operator()(const ScalarType& s, BuilderType* builder) const { + return builder->Append(s.value); + } + }; + + struct AppendBuffer { + template + Status operator()(const ScalarType& s, BuilderType* builder) const { + const Buffer& buffer = *s.value; + return builder->Append(util::string_view{buffer}); + } + }; + + template + enable_if_primitive_ctype Visit(const T&) { + return UseBuilder(AppendValue{}); + } + + template + enable_if_has_string_view Visit(const T&) { + return UseBuilder(AppendBuffer{}); + } + + Status Visit(const StructType& type) { + data_ = ArrayData::Make(type_, static_cast(scalars_.size()), + {/*null_bitmap=*/nullptr}); + data_->child_data.resize(type_->num_fields()); + + ScalarVector field_scalars(scalars_.size()); + + for (int field_index = 0; field_index < type.num_fields(); ++field_index) { + for (size_t i = 0; i < scalars_.size(); ++i) { + field_scalars[i] = + internal::checked_cast(scalars_[i].get())->value[field_index]; + } + + ARROW_ASSIGN_OR_RAISE(data_->child_data[field_index], + ScalarVectorToArrayImpl{}.Convert(field_scalars)); + } + return Status::OK(); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("ScalarVectorToArray for type ", type); + } + + Result> Convert(const ScalarVector& scalars) && { + if (scalars.size() == 0) { + return Status::NotImplemented("ScalarVectorToArray with no scalars"); + } + scalars_ = std::move(scalars); + type_ = scalars_[0]->type; + RETURN_NOT_OK(VisitTypeInline(*type_, this)); + return std::move(data_); + } + + std::shared_ptr type_; + ScalarVector scalars_; + std::shared_ptr data_; +}; + +Result> ScalarVectorToArray(const ScalarVector& scalars) { + ARROW_ASSIGN_OR_RAISE(auto data, ScalarVectorToArrayImpl{}.Convert(scalars)); + return MakeArray(std::move(data)); +} + } // namespace arrow diff --git a/cpp/src/arrow/testing/generator.h b/cpp/src/arrow/testing/generator.h index 9188dca570999..c300022432a94 100644 --- a/cpp/src/arrow/testing/generator.h +++ b/cpp/src/arrow/testing/generator.h @@ -255,4 +255,7 @@ class ARROW_TESTING_EXPORT ConstantArrayGenerator { } }; +ARROW_TESTING_EXPORT +Result> ScalarVectorToArray(const ScalarVector& scalars); + } // namespace arrow diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 462a5237921f2..1e366539cbec7 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -131,20 +131,21 @@ void AssertArraysEqualWith(const Array& expected, const Array& actual, bool verb } } -void AssertArraysEqual(const Array& expected, const Array& actual, bool verbose) { +void AssertArraysEqual(const Array& expected, const Array& actual, bool verbose, + const EqualOptions& options) { return AssertArraysEqualWith( expected, actual, verbose, - [](const Array& expected, const Array& actual, std::stringstream* diff) { - return expected.Equals(actual, EqualOptions().diff_sink(diff)); + [&](const Array& expected, const Array& actual, std::stringstream* diff) { + return expected.Equals(actual, options.diff_sink(diff)); }); } void AssertArraysApproxEqual(const Array& expected, const Array& actual, bool verbose, - const EqualOptions& option) { + const EqualOptions& options) { return AssertArraysEqualWith( expected, actual, verbose, - [&option](const Array& expected, const Array& actual, std::stringstream* diff) { - return expected.ApproxEquals(actual, option.diff_sink(diff)); + [&](const Array& expected, const Array& actual, std::stringstream* diff) { + return expected.ApproxEquals(actual, options.diff_sink(diff)); }); } diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 744af0e0f751f..c5b1fece078b8 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -173,10 +173,12 @@ std::vector AllTypeIds(); // If verbose is true, then the arrays will be pretty printed ARROW_TESTING_EXPORT void AssertArraysEqual(const Array& expected, const Array& actual, - bool verbose = false); -ARROW_TESTING_EXPORT void AssertArraysApproxEqual( - const Array& expected, const Array& actual, bool verbose = false, - const EqualOptions& option = EqualOptions::Defaults()); + bool verbose = false, + const EqualOptions& options = {}); +ARROW_TESTING_EXPORT void AssertArraysApproxEqual(const Array& expected, + const Array& actual, + bool verbose = false, + const EqualOptions& options = {}); // Returns true when values are both null ARROW_TESTING_EXPORT void AssertScalarsEqual( const Scalar& expected, const Scalar& actual, bool verbose = false, diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 00016330075ad..8b4f2219989f0 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -86,20 +86,20 @@ Many compute functions are also available directly as concrete APIs, here Some functions accept or require an options structure that determines the exact semantics of the function:: - MinMaxOptions options; - options.null_handling = MinMaxOptions::EMIT_NULL; + MinMaxOptions min_max_options; + min_max_options.null_handling = MinMaxOptions::EMIT_NULL; std::shared_ptr array = ...; - arrow::Datum min_max_datum; + arrow::Datum min_max; - ARROW_ASSIGN_OR_RAISE(min_max_datum, - arrow::compute::CallFunction("min_max", {array}, &options)); + ARROW_ASSIGN_OR_RAISE(min_max, + arrow::compute::CallFunction("min_max", {array}, + &min_max_options)); // Unpack struct scalar result (a two-field {"min", "max"} scalar) - const auto& min_max_scalar = \ - static_cast(*min_max_datum.scalar()); - const auto min_value = min_max_scalar.value[0]; - const auto max_value = min_max_scalar.value[1]; + std::shared_ptr min_value, max_value; + min_value = min_max.scalar_as().value[0]; + max_value = min_max.scalar_as().value[1]; .. seealso:: :doc:`Compute API reference ` diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 3cb152aa381eb..f3a8eb860d42b 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -58,6 +58,17 @@ cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): return func +cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func): + """ + Wrap a C++ aggregate Function in a HashAggregateFunction object. + """ + cdef HashAggregateFunction func = ( + HashAggregateFunction.__new__(HashAggregateFunction) + ) + func.init(sp_func) + return func + + cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ meta Function in a MetaFunction object. @@ -85,6 +96,8 @@ cdef wrap_function(const shared_ptr[CFunction]& sp_func): return wrap_vector_function(sp_func) elif c_kind == FunctionKind_SCALAR_AGGREGATE: return wrap_scalar_aggregate_function(sp_func) + elif c_kind == FunctionKind_HASH_AGGREGATE: + return wrap_hash_aggregate_function(sp_func) elif c_kind == FunctionKind_META: return wrap_meta_function(sp_func) else: @@ -117,6 +130,16 @@ cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel): return kernel +cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel): + if c_kernel == NULL: + raise ValueError('Kernel was NULL') + cdef HashAggregateKernel kernel = ( + HashAggregateKernel.__new__(HashAggregateKernel) + ) + kernel.init(c_kernel) + return kernel + + cdef class Kernel(_Weakrefable): """ A kernel object. @@ -165,6 +188,18 @@ cdef class ScalarAggregateKernel(Kernel): .format(frombytes(self.kernel.signature.get().ToString()))) +cdef class HashAggregateKernel(Kernel): + cdef: + const CHashAggregateKernel* kernel + + cdef void init(self, const CHashAggregateKernel* kernel) except *: + self.kernel = kernel + + def __repr__(self): + return ("HashAggregateKernel<{}>" + .format(frombytes(self.kernel.signature.get().ToString()))) + + FunctionDoc = namedtuple( "FunctionDoc", ("summary", "description", "arg_names", "options_class")) @@ -190,8 +225,12 @@ cdef class Function(_Weakrefable): in each input. Examples: dictionary encoding, sorting, extracting unique values... - * "aggregate" functions reduce the dimensionality of the inputs by - applying a reduction function. Examples: sum, minmax, mode... + * "scalar_aggregate" functions reduce the dimensionality of the inputs by + applying a reduction function. Examples: sum, min_max, mode... + + * "hash_aggregate" functions apply a reduction function to an input + subdivided by grouping criteria. They may not be directly called. + Examples: hash_sum, hash_min_max... * "meta" functions dispatch to other functions. """ @@ -249,6 +288,8 @@ cdef class Function(_Weakrefable): return 'vector' elif c_kind == FunctionKind_SCALAR_AGGREGATE: return 'scalar_aggregate' + elif c_kind == FunctionKind_HASH_AGGREGATE: + return 'hash_aggregate' elif c_kind == FunctionKind_META: return 'meta' else: @@ -351,6 +392,25 @@ cdef class ScalarAggregateFunction(Function): return [wrap_scalar_aggregate_kernel(k) for k in kernels] +cdef class HashAggregateFunction(Function): + cdef: + const CHashAggregateFunction* func + + cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: + Function.init(self, sp_func) + self.func = sp_func.get() + + @property + def kernels(self): + """ + The kernels implementing this function. + """ + cdef vector[const CHashAggregateKernel*] kernels = ( + self.func.kernels() + ) + return [wrap_hash_aggregate_kernel(k) for k in kernels] + + cdef class MetaFunction(Function): cdef: const CMetaFunction* func diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 3d7f5ecb4c3d1..2cdd843d81a68 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -19,6 +19,8 @@ Function, FunctionOptions, FunctionRegistry, + HashAggregateFunction, + HashAggregateKernel, Kernel, ScalarAggregateFunction, ScalarAggregateKernel, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9afe4d1e720d1..61deb658b0ccd 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1729,6 +1729,10 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: " arrow::compute::ScalarAggregateKernel"(CKernel): pass + cdef cppclass CHashAggregateKernel \ + " arrow::compute::HashAggregateKernel"(CKernel): + pass + cdef cppclass CArity" arrow::compute::Arity": int num_args c_bool is_varargs @@ -1738,6 +1742,8 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: FunctionKind_VECTOR" arrow::compute::Function::VECTOR" FunctionKind_SCALAR_AGGREGATE \ " arrow::compute::Function::SCALAR_AGGREGATE" + FunctionKind_HASH_AGGREGATE \ + " arrow::compute::Function::HASH_AGGREGATE" FunctionKind_META \ " arrow::compute::Function::META" @@ -1771,6 +1777,11 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: (CFunction): vector[const CScalarAggregateKernel*] kernels() const + cdef cppclass CHashAggregateFunction\ + " arrow::compute::HashAggregateFunction"\ + (CFunction): + vector[const CHashAggregateKernel*] kernels() const + cdef cppclass CMetaFunction" arrow::compute::MetaFunction"(CFunction): pass diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 673c1387c4749..112629fc702bd 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -129,11 +129,16 @@ def test_get_function_vector(): _check_get_function("unique", pc.VectorFunction, pc.VectorKernel, 8) -def test_get_function_aggregate(): +def test_get_function_scalar_aggregate(): _check_get_function("mean", pc.ScalarAggregateFunction, pc.ScalarAggregateKernel, 8) +def test_get_function_hash_aggregate(): + _check_get_function("hash_sum", pc.HashAggregateFunction, + pc.HashAggregateKernel, 1) + + def test_call_function_with_memory_pool(): arr = pa.array(["foo", "bar", "baz"]) indices = np.array([2, 2, 1]) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2c13537f0c0da..86ee1303d1c3c 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -284,6 +284,10 @@ compute__CallFunction <- function(func_name, args, options){ .Call(`_arrow_compute__CallFunction`, func_name, args, options) } +compute__GroupBy <- function(arguments, keys, options){ + .Call(`_arrow_compute__GroupBy`, arguments, keys, options) +} + list_compute_functions <- function(){ .Call(`_arrow_list_compute_functions`) } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index e6fdba4d4d4fc..2745f69d90ca4 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -470,21 +470,65 @@ pull.arrow_dplyr_query <- function(.data, var = -1) { pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query summarise.arrow_dplyr_query <- function(.data, ...) { + call <- match.call() .data <- arrow_dplyr_query(.data) if (query_on_dataset(.data)) { not_implemented_for_dataset("summarize()") } + exprs <- quos(...) # Only retain the columns we need to do our aggregations vars_to_keep <- unique(c( - unlist(lapply(quos(...), all.vars)), # vars referenced in summarise + unlist(lapply(exprs, all.vars)), # vars referenced in summarise dplyr::group_vars(.data) # vars needed for grouping )) .data <- dplyr::select(.data, vars_to_keep) - # TODO: determine whether work can be pushed down to Arrow - dplyr::summarise(dplyr::collect(.data), ...) + if (isTRUE(getOption("arrow.summarize", FALSE))) { + # Try stuff, if successful return() + out <- try(do_arrow_group_by(.data, ...), silent = TRUE) + if (inherits(out, "try-error")) { + return(abandon_ship(call, .data, format(out))) + } else { + return(out) + } + } else { + # If unsuccessful or if option not set, do the work in R + dplyr::summarise(dplyr::collect(.data), ...) + } } summarise.Dataset <- summarise.ArrowTabular <- summarise.arrow_dplyr_query +do_arrow_group_by <- function(.data, ...) { + exprs <- quos(...) + mask <- arrow_mask(.data) + # Add aggregation wrappers to arrow_mask somehow + # (this is not ideal, would overwrite same-named objects) + mask$sum <- function(x, na.rm = FALSE) { + list( + fun = "sum", + data = x, + options = list(na.rm = na.rm) + ) + } + results <- list() + for (i in seq_along(exprs)) { + # Iterate over the indices and not the names because names may be repeated + # (which overwrites the previous name) + new_var <- names(exprs)[i] + results[[new_var]] <- arrow_eval(exprs[[i]], mask) + if (inherits(results[[new_var]], "try-error")) { + msg <- paste('Expression', as_label(exprs[[i]]), 'not supported in Arrow') + stop(msg, call. = FALSE) + } + # Put it in the data mask too? + #mask[[new_var]] <- mask$.data[[new_var]] <- results[[new_var]] + } + # Now, from that, split out the array (expressions) and options + opts <- lapply(results, function(x) x[c("fun", "options")]) + inputs <- lapply(results, function(x) eval_array_expression(x$data, .data$.data)) + grouping_vars <- lapply(.data$group_by_vars, function(x) eval_array_expression(.data$selected_columns[[x]], .data$.data)) + compute__GroupBy(inputs, grouping_vars, opts) +} + group_by.arrow_dplyr_query <- function(.data, ..., .add = FALSE, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 697451d0dd9b8..35dab553fbdf9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -616,6 +616,16 @@ BEGIN_CPP11 END_CPP11 } // compute.cpp +SEXP compute__GroupBy(cpp11::list arguments, cpp11::list keys, cpp11::list options); +extern "C" SEXP _arrow_compute__GroupBy(SEXP arguments_sexp, SEXP keys_sexp, SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input::type arguments(arguments_sexp); + arrow::r::Input::type keys(keys_sexp); + arrow::r::Input::type options(options_sexp); + return cpp11::as_sexp(compute__GroupBy(arguments, keys, options)); +END_CPP11 +} +// compute.cpp std::vector list_compute_functions(); extern "C" SEXP _arrow_list_compute_functions(){ BEGIN_CPP11 @@ -4236,6 +4246,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, + { "_arrow_compute__GroupBy", (DL_FUNC) &_arrow_compute__GroupBy, 3}, { "_arrow_list_compute_functions", (DL_FUNC) &_arrow_list_compute_functions, 0}, { "_arrow_csv___ReadOptions__initialize", (DL_FUNC) &_arrow_csv___ReadOptions__initialize, 1}, { "_arrow_csv___ParseOptions__initialize", (DL_FUNC) &_arrow_csv___ParseOptions__initialize, 1}, diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 7bcded78f0d8c..07380354b123f 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -199,6 +199,29 @@ SEXP compute__CallFunction(std::string func_name, cpp11::list args, cpp11::list return from_datum(std::move(out)); } +// [[arrow::export]] +SEXP compute__GroupBy(cpp11::list arguments, cpp11::list keys, cpp11::list options) { + // options is a list of pairs: string function name, list of options + + std::vector> keep_alives; + std::vector aggregates; + + for (cpp11::list name_opts : options) { + auto name = cpp11::as_cpp(name_opts[0]); + auto opts = make_compute_options(name, name_opts[1]); + + aggregates.push_back( + arrow::compute::internal::Aggregate{std::move(name), opts.get()}); + keep_alives.push_back(std::move(opts)); + } + + auto datum_arguments = arrow::r::from_r_list(arguments); + auto datum_keys = arrow::r::from_r_list(keys); + auto out = ValueOrStop(arrow::compute::internal::GroupBy(datum_arguments, datum_keys, + aggregates, gc_context())); + return from_datum(std::move(out)); +} + // [[arrow::export]] std::vector list_compute_functions() { return arrow::compute::GetFunctionRegistry()->GetFunctionNames();