Skip to content

Commit 9c46b1f

Browse files
alambviirya
andauthored
Refactor Decimal128 averaging code to be vectorizable (and easier to read) (apache#6810)
* Refactor Decimal128 averaging code to be vectorizable (and easier to read) * Update datafusion/physical-expr/src/aggregate/utils.rs Co-authored-by: Liang-Chi Hsieh <[email protected]> --------- Co-authored-by: Liang-Chi Hsieh <[email protected]>
1 parent ea71acf commit 9c46b1f

File tree

1 file changed

+96
-34
lines changed
  • datafusion/physical-expr/src/aggregate

1 file changed

+96
-34
lines changed

datafusion/physical-expr/src/aggregate/utils.rs

+96-34
Original file line numberDiff line numberDiff line change
@@ -37,45 +37,107 @@ pub fn get_accum_scalar_values_as_arrays(
3737
.collect::<Vec<_>>())
3838
}
3939

40-
pub fn calculate_result_decimal_for_avg(
41-
lit_value: i128,
42-
count: i128,
43-
scale: i8,
44-
target_type: &DataType,
45-
) -> Result<ScalarValue> {
46-
match target_type {
47-
DataType::Decimal128(p, s) => {
48-
// Different precision for decimal128 can store different range of value.
49-
// For example, the precision is 3, the max of value is `999` and the min
50-
// value is `-999`
51-
let (target_mul, target_min, target_max) = (
52-
10_i128.pow(*s as u32),
53-
MIN_DECIMAL_FOR_EACH_PRECISION[*p as usize - 1],
54-
MAX_DECIMAL_FOR_EACH_PRECISION[*p as usize - 1],
55-
);
56-
let lit_scale_mul = 10_i128.pow(scale as u32);
57-
if target_mul >= lit_scale_mul {
58-
if let Some(value) = lit_value.checked_mul(target_mul / lit_scale_mul) {
59-
let new_value = value / count;
60-
if new_value >= target_min && new_value <= target_max {
61-
Ok(ScalarValue::Decimal128(Some(new_value), *p, *s))
62-
} else {
63-
Err(DataFusionError::Execution(
64-
"Arithmetic Overflow in AvgAccumulator".to_string(),
65-
))
66-
}
67-
} else {
68-
// can't convert the lit decimal to the returned data type
69-
Err(DataFusionError::Execution(
70-
"Arithmetic Overflow in AvgAccumulator".to_string(),
71-
))
72-
}
40+
/// Computes averages for `Decimal128` values, checking for overflow
41+
///
42+
/// This is needed because different precisions for Decimal128 can
43+
/// store different ranges of values and thus sum/count may not fit in
44+
/// the target type.
45+
///
46+
/// For example, the precision is 3, the max of value is `999` and the min
47+
/// value is `-999`
48+
pub(crate) struct Decimal128Averager {
49+
/// scale factor for sum values (10^sum_scale)
50+
sum_mul: i128,
51+
/// scale factor for target (10^target_scale)
52+
target_mul: i128,
53+
/// The minimum output value possible to represent with the target precision
54+
target_min: i128,
55+
/// The maximum output value possible to represent with the target precision
56+
target_max: i128,
57+
}
58+
59+
impl Decimal128Averager {
60+
/// Create a new `Decimal128Averager`:
61+
///
62+
/// * sum_scale: the scale of `sum` values passed to [`Self::avg`]
63+
/// * target_precision: the output precision
64+
/// * target_scale: the output scale
65+
///
66+
/// Errors if the resulting data can not be stored
67+
pub fn try_new(
68+
sum_scale: i8,
69+
target_precision: u8,
70+
target_scale: i8,
71+
) -> Result<Self> {
72+
let sum_mul = 10_i128.pow(sum_scale as u32);
73+
let target_mul = 10_i128.pow(target_scale as u32);
74+
let target_min = MIN_DECIMAL_FOR_EACH_PRECISION[target_precision as usize - 1];
75+
let target_max = MAX_DECIMAL_FOR_EACH_PRECISION[target_precision as usize - 1];
76+
77+
if target_mul >= sum_mul {
78+
Ok(Self {
79+
sum_mul,
80+
target_mul,
81+
target_min,
82+
target_max,
83+
})
84+
} else {
85+
// can't convert the lit decimal to the returned data type
86+
Err(DataFusionError::Execution(
87+
"Arithmetic Overflow in AvgAccumulator".to_string(),
88+
))
89+
}
90+
}
91+
92+
/// Returns the `sum`/`count` as a i128 Decimal128 with
93+
/// target_scale and target_precision and reporting overflow.
94+
///
95+
/// * sum: The total sum value stored as Decimal128 with sum_scale
96+
/// (passed to `Self::try_new`)
97+
/// * count: total count, stored as a i128 (*NOT* a Decimal128 value)
98+
#[inline(always)]
99+
pub fn avg(&self, sum: i128, count: i128) -> Result<i128> {
100+
if let Some(value) = sum.checked_mul(self.target_mul / self.sum_mul) {
101+
let new_value = value / count;
102+
if new_value >= self.target_min && new_value <= self.target_max {
103+
Ok(new_value)
73104
} else {
74-
// can't convert the lit decimal to the returned data type
75105
Err(DataFusionError::Execution(
76106
"Arithmetic Overflow in AvgAccumulator".to_string(),
77107
))
78108
}
109+
} else {
110+
// can't convert the lit decimal to the returned data type
111+
Err(DataFusionError::Execution(
112+
"Arithmetic Overflow in AvgAccumulator".to_string(),
113+
))
114+
}
115+
}
116+
}
117+
118+
/// Returns `sum`/`count` for decimal values, detecting and reporting overflow.
119+
///
120+
/// * sum: stored as Decimal128 with `sum_scale` scale
121+
/// * count: stored as a i128 (*NOT* a Decimal128 value)
122+
/// * sum_scale: the scale of `sum`
123+
/// * target_type: the output decimal type
124+
pub fn calculate_result_decimal_for_avg(
125+
sum: i128,
126+
count: i128,
127+
sum_scale: i8,
128+
target_type: &DataType,
129+
) -> Result<ScalarValue> {
130+
match target_type {
131+
DataType::Decimal128(target_precision, target_scale) => {
132+
let new_value =
133+
Decimal128Averager::try_new(sum_scale, *target_precision, *target_scale)?
134+
.avg(sum, count)?;
135+
136+
Ok(ScalarValue::Decimal128(
137+
Some(new_value),
138+
*target_precision,
139+
*target_scale,
140+
))
79141
}
80142
other => Err(DataFusionError::Internal(format!(
81143
"Invalid target type in AvgAccumulator {other:?}"

0 commit comments

Comments
 (0)