Skip to content

Commit

Permalink
Merge pull request databendlabs#8825 from lichuang/column_ndv
Browse files Browse the repository at this point in the history
feat: add distinct count aggregator and column distinct count
  • Loading branch information
BohuTANG authored Nov 21, 2022
2 parents b6623b7 + b4afcbc commit 8cd7530
Show file tree
Hide file tree
Showing 24 changed files with 313 additions and 19 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sha1 = "0.10.5"
sha2 = "0.10.6"
simdutf8 = "0.1.4"
siphasher = "0.3"
streaming_algorithms = { git = "https://github.com/datafuse-extras/streaming_algorithms", tag = "hyperloglog_del_op_fix_overflow_bug" }
strength_reduce = "0.2.3"
twox-hash = "1.6.3"
uuid = { version = "1.1.2", features = ["v4"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;
use std::fmt;
use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_datavalues::prelude::*;
use common_exception::Result;
use common_io::prelude::*;
use streaming_algorithms::HyperLogLog;

use super::aggregate_function::AggregateFunction;
use super::aggregate_function_factory::AggregateFunctionDescription;
use super::StateAddr;
use crate::aggregates::aggregator_common::assert_unary_arguments;

/// Use Hyperloglog to estimate distinct of values
#[derive(serde::Serialize, serde::Deserialize)]
pub struct AggregateApproximateDistinctCountState {
hll: HyperLogLog<DataValue>,
}

/// S: ScalarType
#[derive(Clone)]
pub struct AggregateApproximateDistinctCountFunction {
display_name: String,
}

impl AggregateApproximateDistinctCountFunction {
pub fn try_create(
display_name: &str,
_params: Vec<DataValue>,
arguments: Vec<DataField>,
) -> Result<Arc<dyn AggregateFunction>> {
assert_unary_arguments(display_name, arguments.len())?;
Ok(Arc::new(AggregateApproximateDistinctCountFunction {
display_name: display_name.to_string(),
}))
}

pub fn desc() -> AggregateFunctionDescription {
let features = super::aggregate_function_factory::AggregateFunctionFeatures {
returns_default_when_only_null: true,
..Default::default()
};
AggregateFunctionDescription::creator_with_features(Box::new(Self::try_create), features)
}
}

impl AggregateFunction for AggregateApproximateDistinctCountFunction {
fn name(&self) -> &str {
"AggregateApproximateDistinctCountFunction"
}

fn return_type(&self) -> Result<DataTypeImpl> {
Ok(u64::to_data_type())
}

fn init_state(&self, place: StateAddr) {
place.write(|| AggregateApproximateDistinctCountState {
hll: HyperLogLog::new(0.04),
});
}

fn state_layout(&self) -> Layout {
Layout::new::<AggregateApproximateDistinctCountState>()
}

fn accumulate(
&self,
place: StateAddr,
columns: &[ColumnRef],
validity: Option<&Bitmap>,
_input_rows: usize,
) -> Result<()> {
let state = place.get::<AggregateApproximateDistinctCountState>();
let column = &columns[0];

let (_, bm) = column.validity();
let bitmap = combine_validities(bm, validity);
let nulls = match bitmap {
Some(ref b) => b.unset_bits(),
None => 0,
};
if column.len() == nulls {
return Ok(());
}

if let Some(bitmap) = bitmap {
for (i, value) in column.to_values().iter().enumerate() {
if bitmap.get_bit(i) {
state.hll.push(value);
}
}
} else {
column.to_values().iter().for_each(|value| {
state.hll.push(value);
});
}

Ok(())
}

fn accumulate_row(&self, place: StateAddr, columns: &[ColumnRef], _row: usize) -> Result<()> {
let state = place.get::<AggregateApproximateDistinctCountState>();
for column in columns {
column.to_values().iter().for_each(|value| {
state.hll.push(value);
});
}
Ok(())
}

fn serialize(&self, place: StateAddr, writer: &mut Vec<u8>) -> Result<()> {
let state = place.get::<AggregateApproximateDistinctCountState>();
serialize_into_buf(writer, state)
}

fn deserialize(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
let state = place.get::<AggregateApproximateDistinctCountState>();
*state = deserialize_from_slice(reader)?;
Ok(())
}

fn merge(&self, place: StateAddr, rhs: StateAddr) -> Result<()> {
let state = place.get::<AggregateApproximateDistinctCountState>();
let rhs = rhs.get::<AggregateApproximateDistinctCountState>();
state.hll.union(&rhs.hll);

Ok(())
}

fn merge_result(&self, place: StateAddr, array: &mut dyn MutableColumn) -> Result<()> {
let builder: &mut MutablePrimitiveColumn<u64> = Series::check_get_mutable_column(array)?;
let state = place.get::<AggregateApproximateDistinctCountState>();
builder.append_value(state.hll.len() as u64);

Ok(())
}
}

impl fmt::Display for AggregateApproximateDistinctCountFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.display_name)
}
}
5 changes: 5 additions & 0 deletions src/query/functions/src/aggregates/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::aggregate_min_max_any::aggregate_max_function_desc;
use super::aggregate_min_max_any::aggregate_min_function_desc;
use super::aggregate_stddev_pop::aggregate_stddev_pop_function_desc;
use super::aggregate_window_funnel::aggregate_window_funnel_function_desc;
use super::AggregateApproximateDistinctCountFunction;
use super::AggregateCountFunction;
use super::AggregateFunctionFactory;
use super::AggregateIfCombinator;
Expand Down Expand Up @@ -56,6 +57,10 @@ impl Aggregators {
factory.register("uniq", aggregate_combinator_uniq_desc());

factory.register("retention", aggregate_retention_function_desc());
factory.register(
"approx_count_distinct",
AggregateApproximateDistinctCountFunction::desc(),
);
}

pub fn register_combinator(factory: &mut AggregateFunctionFactory) {
Expand Down
2 changes: 2 additions & 0 deletions src/query/functions/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod adaptors;

#[macro_use]
mod macros;
mod aggregate_approximate_distinct_count;
mod aggregate_arg_min_max;
mod aggregate_avg;
mod aggregate_combinator_distinct;
Expand All @@ -40,6 +41,7 @@ mod aggregate_stddev_pop;
mod aggregate_window_funnel;

pub use adaptors::*;
pub use aggregate_approximate_distinct_count::AggregateApproximateDistinctCountFunction;
pub use aggregate_arg_min_max::AggregateArgMinMaxFunction;
pub use aggregate_avg::AggregateAvgFunction;
pub use aggregate_combinator_distinct::AggregateDistinctCombinator;
Expand Down
28 changes: 28 additions & 0 deletions src/query/functions/tests/it/aggregates/aggregate_combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ fn test_aggregate_combinator_function() -> Result<()> {
Vec::from([4u64]),
)),
},
Test {
name: "approx_count_distinct",
params: vec![],
args: vec![args[0].clone()],
display: "approx_count_distinct",
func_name: "approx_count_distinct",
arrays: vec![arrays[0].clone()],
error: "",
input_array: Box::<common_datavalues::MutablePrimitiveColumn<u64>>::default(),
expect_array: Box::new(MutablePrimitiveColumn::<u64>::from_data(
u64::to_data_type(),
Vec::from([4u64]),
)),
},
Test {
name: "sum-distinct-passed",
params: vec![],
Expand Down Expand Up @@ -237,6 +251,20 @@ fn test_aggregate_combinator_function_on_empty_data() -> Result<()> {
Vec::from([0u64]),
)),
},
Test {
name: "approx_count_distinct",
params: vec![],
args: vec![args[0].clone()],
display: "approx_count_distinct",
func_name: "approx_count_distinct",
arrays: vec![arrays[0].clone()],
error: "",
input_array: Box::<common_datavalues::MutablePrimitiveColumn<u64>>::default(),
expect_array: Box::new(MutablePrimitiveColumn::<u64>::from_data(
u64::to_data_type(),
Vec::from([0u64]),
)),
},
Test {
name: "sum-distinct-passed",
params: vec![],
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/storages/fuse/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fn test_issue_6556_column_statistics_ser_de_compatability_null_count_alias()
max: DataValue::Null,
null_count: 0,
in_memory_size: 0,
distinct_of_values: None,
};

let mut json_value = serde_json::to_value(&col_stats)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn test_to_partitions() -> Result<()> {
max: DataValue::Int64(2),
null_count: 0,
in_memory_size: col_size as u64,
distinct_of_values: None,
};

let col_metas_gen = |col_size| ColumnMeta {
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/tests/it/storages/fuse/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
.iter()
.map(|b| gen_columns_statistics(&b.clone().unwrap()))
.collect::<common_exception::Result<Vec<_>>>()?;
let r = reducers::reduce_block_statistics(&col_stats);
let r = reducers::reduce_block_statistics(&col_stats, None);
assert!(r.is_ok());
let r = r.unwrap();
assert_eq!(3, r.len());
Expand Down Expand Up @@ -124,6 +124,7 @@ fn test_reduce_block_statistics_in_memory_size() -> common_exception::Result<()>
max: DataValue::Null,
null_count: 1,
in_memory_size: 1,
distinct_of_values: Some(1),
}))
})
};
Expand All @@ -132,7 +133,7 @@ fn test_reduce_block_statistics_in_memory_size() -> common_exception::Result<()>
// combine two statistics
let col_stats_left = HashMap::from_iter(iter(0).take(num_of_cols));
let col_stats_right = HashMap::from_iter(iter(0).take(num_of_cols));
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right])?;
let r = reducers::reduce_block_statistics(&[col_stats_left, col_stats_right], None)?;
assert_eq!(num_of_cols, r.len());
// there should be 100 columns in the result
for idx in 1..=100 {
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/tests/it/storages/index/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,21 @@ async fn test_range_filter() -> Result<()> {
max: DataValue::Int64(20),
null_count: 1,
in_memory_size: 0,
distinct_of_values: None,
});
stats.insert(1u32, ColumnStatistics {
min: DataValue::Int64(3),
max: DataValue::Int64(10),
null_count: 0,
in_memory_size: 0,
distinct_of_values: None,
});
stats.insert(2u32, ColumnStatistics {
min: DataValue::String("abc".as_bytes().to_vec()),
max: DataValue::String("bcd".as_bytes().to_vec()),
null_count: 0,
in_memory_size: 0,
distinct_of_values: None,
});

let tests: Vec<Test> = vec![
Expand Down
10 changes: 6 additions & 4 deletions src/query/storages/fuse/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ impl Table for FuseTable {
let stats = &snapshot.summary.col_stats;
FakedColumnStatisticsProvider {
column_stats: stats.clone(),
faked_ndv: snapshot.summary.row_count,
// save row count first
distinct_count: snapshot.summary.row_count,
}
} else {
FakedColumnStatisticsProvider::default()
Expand Down Expand Up @@ -478,8 +479,7 @@ impl Table for FuseTable {
#[derive(Default)]
struct FakedColumnStatisticsProvider {
column_stats: HashMap<ColumnId, FuseColumnStatistics>,
// faked value, just the row number
faked_ndv: u64,
distinct_count: u64,
}

impl ColumnStatisticsProvider for FakedColumnStatisticsProvider {
Expand All @@ -489,7 +489,9 @@ impl ColumnStatisticsProvider for FakedColumnStatisticsProvider {
min: s.min.clone(),
max: s.max.clone(),
null_count: s.null_count,
number_of_distinct_values: self.faked_ndv,
number_of_distinct_values: s
.distinct_of_values
.map_or_else(|| self.distinct_count, |n| n),
})
}
}
2 changes: 1 addition & 1 deletion src/query/storages/fuse/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl FuseTable {
acc.col_stats = if acc.col_stats.is_empty() {
stats.col_stats.clone()
} else {
statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats])?
statistics::reduce_block_statistics(&[&acc.col_stats, &stats.col_stats], None)?
};
seg_acc.push(location.clone());
Ok::<_, ErrorCode>((acc, seg_acc))
Expand Down
Loading

0 comments on commit 8cd7530

Please sign in to comment.