Skip to content

Commit

Permalink
ARROW-10607: [C++][Parquet] Add parquet support for decimal256.
Browse files Browse the repository at this point in the history
- Refactor common code (DecimalSize, FromBigEndian) to places in arrow
- Support writing Decimal256 as FLBA
- Support reading Decimal256 from bytes and FLBA.  Integer types
  don't seem like they would be worthwhile to ever convert to Decimal256
  and the code path is hard to test.
- Adds addition and shift operators to Decimal256 to support testing.

Closes apache#8897 from emkornfield/parquet_bigdecimal

Authored-by: Micah Kornfield <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
emkornfield authored and pitrou committed Dec 15, 2020
1 parent 2081762 commit 0e8de08
Show file tree
Hide file tree
Showing 21 changed files with 612 additions and 429 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/codegen_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes() {

const std::vector<std::shared_ptr<DataType>>& ExampleParametricTypes() {
static DataTypeVector example_parametric_types = {
decimal(12, 2),
decimal128(12, 2),
duration(TimeUnit::SECOND),
timestamp(TimeUnit::SECOND),
time32(TimeUnit::SECOND),
Expand Down
24 changes: 18 additions & 6 deletions cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,11 @@ std::shared_ptr<CastFunction> GetCastToFloating(std::string name) {
return func;
}

std::shared_ptr<CastFunction> GetCastToDecimal() {
std::shared_ptr<CastFunction> GetCastToDecimal128() {
OutputType sig_out_ty(ResolveOutputFromOptions);

auto func = std::make_shared<CastFunction>("cast_decimal", Type::DECIMAL);
AddCommonCasts(Type::DECIMAL, sig_out_ty, func.get());
auto func = std::make_shared<CastFunction>("cast_decimal", Type::DECIMAL128);
AddCommonCasts(Type::DECIMAL128, sig_out_ty, func.get());

// Cast from floating point
DCHECK_OK(func->AddKernel(Type::FLOAT, {float32()}, sig_out_ty,
Expand All @@ -606,8 +606,19 @@ std::shared_ptr<CastFunction> GetCastToDecimal() {
// Cast from other decimal
auto exec = CastFunctor<Decimal128Type, Decimal128Type>::Exec;
// We resolve the output type of this kernel from the CastOptions
DCHECK_OK(func->AddKernel(Type::DECIMAL, {InputType::Array(Type::DECIMAL)}, sig_out_ty,
exec));
DCHECK_OK(func->AddKernel(Type::DECIMAL128, {InputType::Array(Type::DECIMAL128)},
sig_out_ty, exec));
return func;
}

std::shared_ptr<CastFunction> GetCastToDecimal256() {
OutputType sig_out_ty(ResolveOutputFromOptions);

auto func = std::make_shared<CastFunction>("cast_decimal256", Type::DECIMAL256);
// Needed for Parquet conversion. Full implementation is ARROW-10606
// tracks full implementation.
AddCommonCasts(Type::DECIMAL256, sig_out_ty, func.get());

return func;
}

Expand Down Expand Up @@ -654,7 +665,8 @@ std::vector<std::shared_ptr<CastFunction>> GetNumericCasts() {
functions.push_back(GetCastToFloating<FloatType>("cast_float"));
functions.push_back(GetCastToFloating<DoubleType>("cast_double"));

functions.push_back(GetCastToDecimal());
functions.push_back(GetCastToDecimal128());
functions.push_back(GetCastToDecimal256());

return functions;
}
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,11 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty)
DCHECK_OK(func->AddKernel(base));
}

base.init = GetHashInit<Action>(Type::DECIMAL);
base.signature = KernelSignature::Make({InputType::Array(Type::DECIMAL)}, out_ty);
DCHECK_OK(func->AddKernel(base));
for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) {
base.init = GetHashInit<Action>(t);
base.signature = KernelSignature::Make({InputType::Array(t)}, out_ty);
DCHECK_OK(func->AddKernel(base));
}
}

const FunctionDoc unique_doc(
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,8 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
TakeExec<VarBinaryImpl<LargeBinaryType>>},
{InputType::Array(Type::FIXED_SIZE_BINARY), TakeExec<FSBImpl>},
{InputType::Array(null()), NullTake},
{InputType::Array(Type::DECIMAL), TakeExec<FSBImpl>},
{InputType::Array(Type::DECIMAL128), TakeExec<FSBImpl>},
{InputType::Array(Type::DECIMAL256), TakeExec<FSBImpl>},
{InputType::Array(Type::DICTIONARY), DictionaryTake},
{InputType::Array(Type::EXTENSION), ExtensionTake},
{InputType::Array(Type::LIST), TakeExec<ListImpl<ListType>>},
Expand Down
71 changes: 2 additions & 69 deletions cpp/src/arrow/testing/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#endif

#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/testing/random.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -78,78 +79,10 @@ std::string random_string(int64_t n, uint32_t seed) {
return s;
}

int32_t DecimalSize(int32_t precision) {
DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
<< precision;
DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
<< precision;

switch (precision) {
case 1:
case 2:
return 1; // 127
case 3:
case 4:
return 2; // 32,767
case 5:
case 6:
return 3; // 8,388,607
case 7:
case 8:
case 9:
return 4; // 2,147,483,427
case 10:
case 11:
return 5; // 549,755,813,887
case 12:
case 13:
case 14:
return 6; // 140,737,488,355,327
case 15:
case 16:
return 7; // 36,028,797,018,963,967
case 17:
case 18:
return 8; // 9,223,372,036,854,775,807
case 19:
case 20:
case 21:
return 9; // 2,361,183,241,434,822,606,847
case 22:
case 23:
return 10; // 604,462,909,807,314,587,353,087
case 24:
case 25:
case 26:
return 11; // 154,742,504,910,672,534,362,390,527
case 27:
case 28:
return 12; // 39,614,081,257,132,168,796,771,975,167
case 29:
case 30:
case 31:
return 13; // 10,141,204,801,825,835,211,973,625,643,007
case 32:
case 33:
return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
case 34:
case 35:
return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
case 36:
case 37:
case 38:
return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
default:
DCHECK(false);
break;
}
return -1;
}

void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
std::default_random_engine gen(seed);
std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
const int32_t required_bytes = DecimalSize(precision);
const int32_t required_bytes = DecimalType::DecimalSize(precision);
constexpr int32_t byte_width = 16;
std::fill(out, out + byte_width * n, '\0');

Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,28 @@ std::vector<std::shared_ptr<Field>> StructType::GetAllFieldsByName(
return result;
}

// Taken from the Apache Impala codebase. The comments next
// to the return values are the maximum value that can be represented in 2's
// complement with the returned number of bytes.
int32_t DecimalType::DecimalSize(int32_t precision) {
DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
<< precision;

// Generated in python with:
// >>> decimal_size = lambda prec: int(math.ceil((prec * math.log2(10) + 1) / 8))
// >>> [-1] + [decimal_size(i) for i in range(1, 77)]
constexpr int32_t kBytes[] = {
-1, 1, 1, 2, 2, 3, 3, 4, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8, 9,
9, 9, 10, 10, 11, 11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 16, 16, 16, 17,
17, 18, 18, 18, 19, 19, 20, 20, 21, 21, 21, 22, 22, 23, 23, 23, 24, 24, 25, 25,
26, 26, 26, 27, 27, 28, 28, 28, 29, 29, 30, 30, 31, 31, 31, 32, 32};

if (precision <= 76) {
return kBytes[precision];
}
return static_cast<int32_t>(std::ceil((precision / 8.0) * std::log2(10) + 1));
}

// ----------------------------------------------------------------------
// Decimal128 type

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,11 @@ class ARROW_EXPORT DecimalType : public FixedSizeBinaryType {
int32_t precision() const { return precision_; }
int32_t scale() const { return scale_; }

/// \brief Returns the number of bytes needed for precision.
///
/// precision must be >= 1
static int32_t DecimalSize(int32_t precision);

protected:
std::string ComputeFingerprint() const override;

Expand All @@ -905,6 +910,7 @@ class ARROW_EXPORT Decimal128Type : public DecimalType {

static constexpr int32_t kMinPrecision = 1;
static constexpr int32_t kMaxPrecision = 38;
static constexpr int32_t kByteWidth = 16;
};

/// \brief Concrete type class for 256-bit decimal data
Expand All @@ -925,6 +931,7 @@ class ARROW_EXPORT Decimal256Type : public DecimalType {

static constexpr int32_t kMinPrecision = 1;
static constexpr int32_t kMaxPrecision = 76;
static constexpr int32_t kByteWidth = 32;
};

/// \brief Concrete type class for union data
Expand Down
67 changes: 67 additions & 0 deletions cpp/src/arrow/util/basic_decimal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,50 @@ BasicDecimal256 BasicDecimal256::Abs(const BasicDecimal256& in) {
return result.Abs();
}

BasicDecimal256& BasicDecimal256::operator+=(const BasicDecimal256& right) {
uint64_t carry = 0;
for (size_t i = 0; i < little_endian_array_.size(); i++) {
const uint64_t right_value = right.little_endian_array_[i];
uint64_t sum = right_value + carry;
carry = 0;
if (sum < right_value) {
carry += 1;
}
sum += little_endian_array_[i];
if (sum < little_endian_array_[i]) {
carry += 1;
}
little_endian_array_[i] = sum;
}
return *this;
}

BasicDecimal256& BasicDecimal256::operator<<=(uint32_t bits) {
if (bits == 0) {
return *this;
}
int cross_word_shift = bits / 64;
if (static_cast<size_t>(cross_word_shift) >= little_endian_array_.size()) {
little_endian_array_ = {0, 0, 0, 0};
return *this;
}
uint32_t in_word_shift = bits % 64;
for (int i = static_cast<int>(little_endian_array_.size() - 1); i >= cross_word_shift;
i--) {
// Account for shifts larger then 64 bits
little_endian_array_[i] = little_endian_array_[i - cross_word_shift];
little_endian_array_[i] <<= in_word_shift;
if (in_word_shift != 0 && i >= cross_word_shift + 1) {
little_endian_array_[i] |=
little_endian_array_[i - (cross_word_shift + 1)] >> (64 - in_word_shift);
}
}
for (int i = cross_word_shift - 1; i >= 0; i--) {
little_endian_array_[i] = 0;
}
return *this;
}

std::array<uint8_t, 32> BasicDecimal256::ToBytes() const {
std::array<uint8_t, 32> out{{0}};
ToBytes(out.data());
Expand Down Expand Up @@ -1091,6 +1135,12 @@ DecimalStatus BasicDecimal256::Rescale(int32_t original_scale, int32_t new_scale
return DecimalRescale(*this, original_scale, new_scale, out);
}

bool BasicDecimal256::FitsInPrecision(int32_t precision) const {
DCHECK_GT(precision, 0);
DCHECK_LE(precision, 76);
return BasicDecimal256::Abs(*this) < ScaleMultipliersDecimal256[precision];
}

const BasicDecimal256& BasicDecimal256::GetScaleMultiplier(int32_t scale) {
DCHECK_GE(scale, 0);
DCHECK_LE(scale, 76);
Expand All @@ -1113,13 +1163,30 @@ bool operator<(const BasicDecimal256& left, const BasicDecimal256& right) {
: lhs[1] != rhs[1] ? lhs[1] < rhs[1] : lhs[0] < rhs[0];
}

BasicDecimal256 operator-(const BasicDecimal256& operand) {
BasicDecimal256 result(operand);
return result.Negate();
}

BasicDecimal256 operator~(const BasicDecimal256& operand) {
const std::array<uint64_t, 4>& arr = operand.little_endian_array();
BasicDecimal256 result({~arr[0], ~arr[1], ~arr[2], ~arr[3]});
return result;
}

BasicDecimal256& BasicDecimal256::operator/=(const BasicDecimal256& right) {
BasicDecimal256 remainder;
auto s = Divide(right, this, &remainder);
DCHECK_EQ(s, DecimalStatus::kSuccess);
return *this;
}

BasicDecimal256 operator+(const BasicDecimal256& left, const BasicDecimal256& right) {
BasicDecimal256 sum = left;
sum += right;
return sum;
}

BasicDecimal256 operator/(const BasicDecimal256& left, const BasicDecimal256& right) {
BasicDecimal256 remainder;
BasicDecimal256 result;
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/util/basic_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ class ARROW_EXPORT BasicDecimal256 {
/// \brief Absolute value
static BasicDecimal256 Abs(const BasicDecimal256& left);

/// \brief Add a number to this one. The result is truncated to 256 bits.
BasicDecimal256& operator+=(const BasicDecimal256& right);

/// \brief Get the bits of the two's complement representation of the number. The 4
/// elements are in little endian order. The bits within each uint64_t element are in
/// native endian order. For example,
Expand All @@ -245,6 +248,11 @@ class ARROW_EXPORT BasicDecimal256 {
DecimalStatus Rescale(int32_t original_scale, int32_t new_scale,
BasicDecimal256* out) const;

/// \brief Whether this number fits in the given precision
///
/// Return true if the number of significant digits is less or equal to `precision`.
bool FitsInPrecision(int32_t precision) const;

inline int64_t Sign() const {
return 1 | (static_cast<int64_t>(little_endian_array_[3]) >> 63);
}
Expand All @@ -269,6 +277,9 @@ class ARROW_EXPORT BasicDecimal256 {
/// \param[out] remainder the remainder after the division
DecimalStatus Divide(const BasicDecimal256& divisor, BasicDecimal256* result,
BasicDecimal256* remainder) const;
/// \brief Shift left by the given number of bits.
BasicDecimal256& operator<<=(uint32_t bits);

/// \brief In-place division.
BasicDecimal256& operator/=(const BasicDecimal256& right);

Expand Down Expand Up @@ -303,6 +314,10 @@ ARROW_EXPORT inline bool operator>=(const BasicDecimal256& left,
return !operator<(left, right);
}

ARROW_EXPORT BasicDecimal256 operator-(const BasicDecimal256& operand);
ARROW_EXPORT BasicDecimal256 operator~(const BasicDecimal256& operand);
ARROW_EXPORT BasicDecimal256 operator+(const BasicDecimal256& left,
const BasicDecimal256& right);
ARROW_EXPORT BasicDecimal256 operator*(const BasicDecimal256& left,
const BasicDecimal256& right);
ARROW_EXPORT BasicDecimal256 operator/(const BasicDecimal256& left,
Expand Down
Loading

0 comments on commit 0e8de08

Please sign in to comment.