Skip to content

Commit

Permalink
distinct_count_estimator aka HyperLogLog++ (#429)
Browse files Browse the repository at this point in the history
This PR introduces a new data structure `cuco::distinct_count_estimator`
which implements the famous HyperLogLog++ algorithm for accurately
approximating the number of distinct items in a multiset (see
[paper](https://static.googleusercontent.com/media/research.google.com/de//pubs/archive/40671.pdf)).
This PR will ultimately solve rapidsai/cudf#10652.

---------

Co-authored-by: Yunsong Wang <[email protected]>
  • Loading branch information
sleeepyjack and PointKernel authored Apr 4, 2024
1 parent c620ce3 commit 446af27
Show file tree
Hide file tree
Showing 21 changed files with 3,048 additions and 0 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,12 @@ We plan to add many GPU-accelerated, concurrent data structures to `cuCollection
#### Examples:
- [Host-bulk APIs (TODO)]()

### `distinct_count_estimator`

`cuco::distinct_count_estimator` implements the well-established [HyperLogLog++ algorithm](https://static.googleusercontent.com/media/research.google.com/de//pubs/archive/40671.pdf) for approximating the count of distinct items in a multiset/stream.

#### Examples:
- [Host-bulk APIs](https://github.com/NVIDIA/cuCollections/blob/dev/examples/distinct_count_estimator/host_bulk_example.cu) (see [live example in godbolt](https://godbolt.org/z/ahjEoWM1E))
- [Device-ref APIs](https://github.com/NVIDIA/cuCollections/blob/dev/examples/distinct_count_estimator/device_ref_example.cu) (see [live example in godbolt](https://godbolt.org/z/qebYY8Goj))


5 changes: 5 additions & 0 deletions benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,8 @@ ConfigureBench(DYNAMIC_MAP_BENCH
# - hash function benchmarks ----------------------------------------------------------------------
ConfigureBench(HASH_BENCH
hash_bench.cu)

###################################################################################################
# - distinct_count_estimator benchmarks -----------------------------------------------------------
ConfigureBench(DISTINCT_COUNT_ESTIMATOR_BENCH
distinct_count_estimator_bench.cu)
164 changes: 164 additions & 0 deletions benchmarks/distinct_count_estimator_bench.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <defaults.hpp>
#include <utils.hpp>

#include <cuco/distinct_count_estimator.cuh>
#include <cuco/static_set.cuh>
#include <cuco/utility/key_generator.cuh>

#include <nvbench/nvbench.cuh>

#include <thrust/device_vector.h>
#include <thrust/iterator/transform_iterator.h>

#include <cuda/functional>

#include <cmath>
#include <cstddef>

using namespace cuco::benchmark;
using namespace cuco::utility;

template <typename InputIt>
[[nodiscard]] std::size_t exact_distinct_count(InputIt first, std::size_t n)
{
// TODO static_set currently only supports types up-to 8-bytes in size.
// Casting is valid since the keys generated are representable in int64_t.
using T = std::int64_t;

auto cast_iter = thrust::make_transform_iterator(
first, cuda::proclaim_return_type<T>([] __device__(auto i) { return static_cast<T>(i); }));

auto set = cuco::static_set{n, 0.8, cuco::empty_key<T>{-1}};
set.insert(cast_iter, cast_iter + n);
return set.size();
}

template <class Estimator, class Dist>
[[nodiscard]] double relative_error(nvbench::state& state, std::size_t num_samples)
{
using T = typename Estimator::value_type;

auto const num_items = state.get_int64("NumInputs");
auto const sketch_size_kb = state.get_int64("SketchSizeKB");

thrust::device_vector<T> items(num_items);

key_generator gen;
Estimator estimator{cuco::sketch_size_kb(sketch_size_kb)};
double error_sum = 0;
for (std::size_t i = 0; i < num_samples; ++i) {
gen.generate(dist_from_state<Dist>(state), items.begin(), items.end());
estimator.add(items.begin(), items.end());
double estimated_cardinality = estimator.estimate();
double true_cardinality = exact_distinct_count(items.begin(), num_items);
error_sum += std::abs(estimated_cardinality / true_cardinality - 1.0);
estimator.clear();
}

return error_sum / num_samples;
}

/**
* @brief A benchmark evaluating `cuco::distinct_count_estimator` end-to-end performance
*/
template <typename T, typename Dist>
void distinct_count_estimator_e2e(nvbench::state& state, nvbench::type_list<T, Dist>)
{
using estimator_type = cuco::distinct_count_estimator<T>;

auto const num_items = state.get_int64("NumInputs");
auto const sketch_size_kb = state.get_int64("SketchSizeKB");

state.add_element_count(num_items);
state.add_global_memory_reads<T>(num_items, "InputSize");

auto const err_samples = (cuda::std::is_same_v<Dist, distribution::unique>) ? 1 : 5;
auto const err = relative_error<estimator_type, Dist>(state, err_samples);
auto& summ = state.add_summary("MeanRelativeError");
summ.set_string("hint", "MRelErr");
summ.set_string("short_name", "MeanRelativeError");
summ.set_string("description", "Mean relatve approximation error.");
summ.set_float64("value", err);

thrust::device_vector<T> items(num_items);

key_generator gen;
gen.generate(dist_from_state<Dist>(state), items.begin(), items.end());

estimator_type estimator{cuco::sketch_size_kb(sketch_size_kb)};
std::size_t estimated_cardinality = 0;
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
timer.start();
estimator.add_async(items.begin(), items.end(), {launch.get_stream()});
estimated_cardinality = estimator.estimate({launch.get_stream()});
timer.stop();

estimator.clear_async({launch.get_stream()});
});
}

/**
* @brief A benchmark evaluating `cuco::distinct_count_estimator::add` performance
*/
template <typename T, typename Dist>
void distinct_count_estimator_add(nvbench::state& state, nvbench::type_list<T, Dist>)
{
using estimator_type = cuco::distinct_count_estimator<T>;

auto const num_items = state.get_int64("NumInputs");
auto const sketch_size_kb = state.get_int64("SketchSizeKB");

thrust::device_vector<T> items(num_items);

key_generator gen;
gen.generate(dist_from_state<Dist>(state), items.begin(), items.end());

state.add_element_count(num_items);
state.add_global_memory_reads<T>(num_items, "InputSize");

estimator_type estimator{cuco::sketch_size_kb(sketch_size_kb)};
state.exec(nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
timer.start();
estimator.add_async(items.begin(), items.end(), {launch.get_stream()});
timer.stop();

estimator.clear_async({launch.get_stream()});
});
}

using TYPE_RANGE = nvbench::type_list<nvbench::int32_t, nvbench::int64_t, __int128_t>;

NVBENCH_BENCH_TYPES(distinct_count_estimator_e2e,
NVBENCH_TYPE_AXES(TYPE_RANGE, nvbench::type_list<distribution::uniform>))
.set_name("distinct_count_estimator_e2e")
.set_type_axes_names({"T", "Distribution"})
.add_int64_power_of_two_axis("NumInputs", {28, 29, 30})
.add_int64_axis("SketchSizeKB", {8, 16, 32, 64, 128, 256}) // 256KB uses gmem fallback kernel
.add_int64_axis("Multiplicity", {1})
.set_max_noise(defaults::MAX_NOISE);

NVBENCH_BENCH_TYPES(distinct_count_estimator_add,
NVBENCH_TYPE_AXES(TYPE_RANGE, nvbench::type_list<distribution::uniform>))
.set_name("distinct_count_estimator::add_async")
.set_type_axes_names({"T", "Distribution"})
.add_int64_power_of_two_axis("NumInputs", {28, 29, 30})
.add_int64_axis("SketchSizeKB", {8, 16, 32, 64, 128, 256})
.add_int64_axis("Multiplicity", {1})
.set_max_noise(defaults::MAX_NOISE);
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ ConfigureExample(STATIC_MAP_DEVICE_SIDE_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/sta
ConfigureExample(STATIC_MAP_CUSTOM_TYPE_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/static_map/custom_type_example.cu")
ConfigureExample(STATIC_MAP_COUNT_BY_KEY_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/static_map/count_by_key_example.cu")
ConfigureExample(STATIC_MULTIMAP_HOST_BULK_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/static_multimap/host_bulk_example.cu")
ConfigureExample(DISTINCT_COUNT_ESTIMATOR_HOST_BULK_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/distinct_count_estimator/host_bulk_example.cu")
ConfigureExample(DISTINCT_COUNT_ESTIMATOR_DEVICE_REF_EXAMPLE "${CMAKE_CURRENT_SOURCE_DIR}/distinct_count_estimator/device_ref_example.cu")
164 changes: 164 additions & 0 deletions examples/distinct_count_estimator/device_ref_example.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cuco/distinct_count_estimator.cuh>

#include <thrust/device_vector.h>
#include <thrust/sequence.h>

#include <cstddef>
#include <iostream>

/**
* @file device_ref_example.cu
* @brief Demonstrates usage of `cuco::distinct_count_estimator` device-side APIs.
*
* This example demonstrates how the non-owning reference type `cuco::distinct_count_estimator_ref`
* can be used to implement a custom kernel that fuses the cardinality estimation step with any
* other workload that traverses the input data.
*/

template <class RefType, class InputIt>
__global__ void fused_kernel(RefType ref, InputIt first, std::size_t n)
{
// Transform the reference type (with device scope) to a reference type with block scope
using local_ref_type = typename RefType::with_scope<cuda::thread_scope_block>;

// Shared memory storage for the block-local estimator
extern __shared__ std::byte local_sketch[];

// The following check is optional since the base address of dynamic shared memory is guaranteed
// to meet the alignment requirements
/*
auto const alignment =
1ull << cuda::std::countr_zero(reinterpret_cast<std::uintptr_t>(local_sketch));
assert(alignment >= local_ref_type::sketch_alignment());
*/

auto const loop_stride = gridDim.x * blockDim.x;
auto idx = blockDim.x * blockIdx.x + threadIdx.x;
auto const block = cooperative_groups::this_thread_block();

// Create the local estimator with the shared memory storage
local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()});

// Initialize the local estimator
local_ref.clear(block);
block.sync();

while (idx < n) {
auto const& item = *(first + idx);

// Add each item to the local estimator
local_ref.add(item);

/*
Here we can add some custom workload that takes the input `item`.
The idea is that cardinality estimation can be fused with any other workload that
traverses the data. Since `local_ref.add` can run close to the SOL of the DRAM bandwidth, we get
the estimate "for free" while performing other computations over the data.
*/

idx += loop_stride;
}
block.sync();

// We can also compute the local estimate on the device
// auto const local_estimate = local_ref.estimate(block);
if (block.thread_rank() == 0) {
// The local estimate should approximately be `num_items`/`gridDim.x`
// printf("Estimate for block %d = %llu\n", blockIdx.x, local_estimate);
}

// In the end, we merge the shared memory estimator into the global estimator which gives us the
// final result
ref.merge(block, local_ref);
}

template <typename Ref, typename InputIt, typename OutputIt>
__global__ void device_estimate_kernel(cuco::sketch_size_kb sketch_size_kb,
InputIt in,
size_t n,
OutputIt out)
{
extern __shared__ std::byte local_sketch[];

auto const block = cooperative_groups::this_thread_block();

// only a single block computes the estimate
if (block.group_index().x == 0) {
Ref estimator(cuda::std::span(local_sketch, Ref::sketch_bytes(sketch_size_kb)));

estimator.clear(block);
block.sync();

for (int i = block.thread_rank(); i < n; i += block.num_threads()) {
estimator.add(*(in + i));
}
block.sync();
// we can compute the final estimate on the device and return the result to the host
auto const estimate = estimator.estimate(block);

if (block.thread_rank() == 0) { *out = estimate; }
}
}

int main(void)
{
using T = int;
using estimator_type = cuco::distinct_count_estimator<T>;
constexpr std::size_t num_items = 1ull << 28; // 1GB
auto const sketch_size_kb = 32_KB;

thrust::device_vector<T> items(num_items);

// Generate `num_items` distinct items
thrust::sequence(items.begin(), items.end(), 0);

// Initialize the estimator
estimator_type estimator(sketch_size_kb);

// Add all items to the estimator
estimator.add(items.begin(), items.end());

// Calculate the cardinality estimate from the bulk operation
std::size_t const estimated_cardinality_bulk = estimator.estimate();

// Clear the estimator so it can be reused
estimator.clear();

// Number of dynamic shared memory bytes required to store a CTA-local sketch
auto const sketch_bytes = estimator.sketch_bytes();

// Call the custom kernel and pass a non-owning reference to the estimator to the GPU
fused_kernel<<<10, 512, sketch_bytes>>>(estimator.ref(), items.begin(), num_items);

// Calculate the cardinality estimate from the custom kernel
std::size_t const estimated_cardinality_custom = estimator.estimate();

thrust::device_vector<std::size_t> device_estimate(1);
device_estimate_kernel<typename estimator_type::ref_type<cuda::thread_scope_block>>
<<<1, 512, sketch_bytes>>>(sketch_size_kb, items.begin(), num_items, device_estimate.begin());

std::size_t const estimated_cardinality_device = device_estimate[0];

if (estimated_cardinality_custom == estimated_cardinality_bulk and
estimated_cardinality_device == estimated_cardinality_bulk) {
std::cout << "Success! Cardinality estimates are identical" << std::endl;
}

return 0;
}
Loading

0 comments on commit 446af27

Please sign in to comment.