Skip to content

Commit

Permalink
ARROW-10304: [C++][Compute] Optimize variance kernel for integers
Browse files Browse the repository at this point in the history
Improve variance kernel performance for integers by leveraging
textbook one pass algorithm and integer arithmetic.

Closes apache#8466 from cyb70289/variance-integer

Authored-by: Yibo Cai <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
cyb70289 authored and pitrou committed Oct 22, 2020
1 parent 625bf3f commit e2d8dc3
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 21 deletions.
100 changes: 87 additions & 13 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1058,23 +1058,57 @@ TYPED_TEST(TestNumericVarStdKernel, Basics) {
this->AssertVarStdIsInvalid(chunks, options);
}

class TestVarStdKernelStability : public TestPrimitiveVarStdKernel<DoubleType> {};

// Test numerical stability
TEST_F(TestVarStdKernelStability, Basics) {
template <typename ArrowType>
class TestVarStdKernelStability : public TestPrimitiveVarStdKernel<ArrowType> {};

typedef ::testing::Types<Int32Type, UInt32Type, Int64Type, UInt64Type, DoubleType>
VarStdStabilityTypes;

TYPED_TEST_SUITE(TestVarStdKernelStability, VarStdStabilityTypes);
TYPED_TEST(TestVarStdKernelStability, Basics) {
VarianceOptions options{1}; // ddof = 1
this->AssertVarStdIs("[100000004, 100000007, 100000013, 100000016]", options, 30.0);
this->AssertVarStdIs("[1000000004, 1000000007, 1000000013, 1000000016]", options, 30.0);
if (!is_unsigned_integer_type<TypeParam>::value) {
this->AssertVarStdIs("[-1000000016, -1000000013, -1000000007, -1000000004]", options,
30.0);
}
}

// Test numerical stability of variance merging code
class TestVarStdKernelMergeStability : public TestPrimitiveVarStdKernel<DoubleType> {};

TEST_F(TestVarStdKernelMergeStability, Basics) {
VarianceOptions options{1}; // ddof = 1

#ifndef __MINGW32__ // MinGW has precision issues
// This test is to make sure our variance combining method is stable.
// XXX: The reference value from numpy is actually wrong due to floating
// point limits. The correct result should equals variance(90, 0) = 4050.
std::vector<std::string> chunks = {"[40000008000000490]", "[40000008000000400]"};
this->AssertVarStdIs(chunks, options, 3904.0);
#endif
}

// Test integer arithmetic code
class TestVarStdKernelInt32 : public TestPrimitiveVarStdKernel<Int32Type> {};

TEST_F(TestVarStdKernelInt32, Basics) {
VarianceOptions options{1};
this->AssertVarStdIs("[-2147483648, -2147483647, -2147483646]", options, 1.0);
this->AssertVarStdIs("[2147483645, 2147483646, 2147483647]", options, 1.0);
this->AssertVarStdIs("[-2147483648, -2147483648, 2147483647]", options,
6.148914688373205e+18);
}

class TestVarStdKernelUInt32 : public TestPrimitiveVarStdKernel<UInt32Type> {};

TEST_F(TestVarStdKernelUInt32, Basics) {
VarianceOptions options{1};
this->AssertVarStdIs("[4294967293, 4294967294, 4294967295]", options, 1.0);
this->AssertVarStdIs("[0, 0, 4294967295]", options, 6.148914688373205e+18);
}

// https://en.wikipedia.org/wiki/Kahan_summation_algorithm
void KahanSum(double& sum, double& adjust, double addend) {
double y = addend - adjust;
Expand All @@ -1085,35 +1119,51 @@ void KahanSum(double& sum, double& adjust, double addend) {

// Calculate reference variance with Welford's online algorithm + Kahan summation
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
std::pair<double, double> WelfordVar(const Array& array) {
const auto& array_numeric = reinterpret_cast<const DoubleArray&>(array);
const auto values = array_numeric.raw_values();
// XXX: not stable for long array with very small `stddev / average`
template <typename ArrayType>
std::pair<double, double> WelfordVar(const ArrayType& array) {
const auto values = array.raw_values();
internal::BitmapReader reader(array.null_bitmap_data(), array.offset(), array.length());
double count = 0, mean = 0, m2 = 0;
double mean_adjust = 0, m2_adjust = 0;
for (int64_t i = 0; i < array.length(); ++i) {
if (reader.IsSet()) {
++count;
double delta = values[i] - mean;
double delta = static_cast<double>(values[i]) - mean;
KahanSum(mean, mean_adjust, delta / count);
double delta2 = values[i] - mean;
double delta2 = static_cast<double>(values[i]) - mean;
KahanSum(m2, m2_adjust, delta * delta2);
}
reader.Next();
}
return std::make_pair(m2 / count, m2 / (count - 1));
}

class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel<DoubleType> {};
// Test random chunked array
template <typename ArrowType>
class TestVarStdKernelRandom : public TestPrimitiveVarStdKernel<ArrowType> {};

typedef ::testing::Types<Int32Type, UInt32Type, Int64Type, UInt64Type, FloatType,
DoubleType>
VarStdRandomTypes;

TEST_F(TestVarStdKernelRandom, Basics) {
TYPED_TEST_SUITE(TestVarStdKernelRandom, VarStdRandomTypes);
TYPED_TEST(TestVarStdKernelRandom, Basics) {
// Cut array into small chunks
constexpr int array_size = 5000;
constexpr int chunk_size_max = 50;
constexpr int chunk_count = array_size / chunk_size_max;

std::shared_ptr<Array> array;
auto rand = random::RandomArrayGenerator(0x5487656);
auto array = rand.Numeric<DoubleType>(array_size, -10000.0, 100000.0, 0.1);
if (is_floating_type<TypeParam>::value) {
array = rand.Numeric<TypeParam>(array_size, -10000.0, 100000.0, 0.1);
} else {
using CType = typename TypeParam::c_type;
constexpr CType min = std::numeric_limits<CType>::min();
constexpr CType max = std::numeric_limits<CType>::max();
array = rand.Numeric<TypeParam>(array_size, min, max, 0.1);
}
auto chunk_size_array = rand.Numeric<Int32Type>(chunk_count, 0, chunk_size_max);
const int* chunk_size = chunk_size_array->data()->GetValues<int>(1);
int total_size = 0;
Expand All @@ -1126,11 +1176,35 @@ TEST_F(TestVarStdKernelRandom, Basics) {
auto chunked = *ChunkedArray::Make(array_vector);

double var_population, var_sample;
std::tie(var_population, var_sample) = WelfordVar(*(array->Slice(0, total_size)));
using ArrayType = typename TypeTraits<TypeParam>::ArrayType;
auto typed_array = std::static_pointer_cast<ArrayType>(array->Slice(0, total_size));
std::tie(var_population, var_sample) = WelfordVar(*typed_array);

this->AssertVarStdIs(chunked, VarianceOptions{0}, var_population);
this->AssertVarStdIs(chunked, VarianceOptions{1}, var_sample);
}

// This test is too heavy to run in CI, should be checked manually
#if 0
class TestVarStdKernelIntegerLength : public TestPrimitiveVarStdKernel<Int32Type> {};

TEST_F(TestVarStdKernelIntegerLength, Basics) {
constexpr int32_t min = std::numeric_limits<int32_t>::min();
constexpr int32_t max = std::numeric_limits<int32_t>::max();
auto rand = random::RandomArrayGenerator(0x5487657);
// large data volume
auto array = rand.Numeric<Int32Type>(4000000000, min, max, 0.1);
// biased distribution
// auto array = rand.Numeric<Int32Type>(4000000000, min, min + 100000, 0.1);

double var_population, var_sample;
auto int32_array = std::static_pointer_cast<Int32Array>(array);
std::tie(var_population, var_sample) = WelfordVar(*int32_array);

this->AssertVarStdIs(*array, VarianceOptions{0}, var_population);
this->AssertVarStdIs(*array, VarianceOptions{1}, var_sample);
}
#endif

} // namespace compute
} // namespace arrow
70 changes: 62 additions & 8 deletions cpp/src/arrow/compute/kernels/aggregate_var_std.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,43 @@
// under the License.

#include "arrow/compute/kernels/aggregate_basic_internal.h"
#include "arrow/util/int128_internal.h"

namespace arrow {
namespace compute {
namespace aggregate {

namespace {

using arrow::internal::int128_t;

template <typename ArrowType>
struct VarStdState {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using c_type = typename ArrowType::c_type;
using CType = typename ArrowType::c_type;
using ThisType = VarStdState<ArrowType>;

// Calculate `m2` (sum((X-mean)^2)) of one chunk with `two pass algorithm`
// float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
// Always use `double` to calculate variance for any array type
void Consume(const ArrayType& array) {
template <typename T = ArrowType>
enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4)> Consume(
const ArrayType& array) {
int64_t count = array.length() - array.null_count();
if (count == 0) {
return;
}

double sum = 0;
using SumType =
typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
SumType sum = 0;
VisitArrayDataInline<ArrowType>(
*array.data(), [&sum](c_type value) { sum += static_cast<double>(value); },
*array.data(), [&sum](CType value) { sum += static_cast<SumType>(value); },
[]() {});

double mean = sum / count, m2 = 0;
double mean = static_cast<double>(sum) / count, m2 = 0;
VisitArrayDataInline<ArrowType>(
*array.data(),
[mean, &m2](c_type value) {
[mean, &m2](CType value) {
double v = static_cast<double>(value);
m2 += (v - mean) * (v - mean);
},
Expand All @@ -57,6 +63,54 @@ struct VarStdState {
this->m2 = m2;
}

// int32/16/8: textbook one pass algorithm with integer arithmetic
template <typename T = ArrowType>
enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4)> Consume(
const ArrayType& array) {
// max number of elements that sum will not overflow int64 (2Gi int32 elements)
// for uint32: 0 <= sum < 2^63 (int64 >= 0)
// for int32: -2^62 <= sum < 2^62
constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);

int64_t start_index = 0;
int64_t valid_count = array.length() - array.null_count();

while (valid_count > 0) {
// process in chunks that overflow will never happen
const auto slice = array.Slice(start_index, max_length);
const int64_t count = slice->length() - slice->null_count();
start_index += max_length;
valid_count -= count;

if (count > 0) {
int64_t sum = 0;
int128_t square_sum = 0;
VisitArrayDataInline<ArrowType>(
*slice->data(),
[&sum, &square_sum](CType value) {
sum += value;
square_sum += static_cast<uint64_t>(value) * value;
},
[]() {});

const double mean = static_cast<double>(sum) / count;
// calculate m2 = square_sum - sum * sum / count
// decompose `sum * sum / count` into integers and fractions
const int128_t sum_square = static_cast<int128_t>(sum) * sum;
const int128_t integers = sum_square / count;
const double fractions = static_cast<double>(sum_square % count) / count;
const double m2 = static_cast<double>(square_sum - integers) - fractions;

// merge variance
ThisType state;
state.count = count;
state.mean = mean;
state.m2 = m2;
this->MergeFrom(state);
}
}
}

// Combine `m2` from two chunks (m2 = n*s2)
// https://www.emathzone.com/tutorials/basic-statistics/combined-variance.html
void MergeFrom(const ThisType& state) {
Expand Down

0 comments on commit e2d8dc3

Please sign in to comment.