Skip to content

Commit

Permalink
KUDU-2483 Integrate BlockBloomFilter with ColumnPredicate on server side
Browse files Browse the repository at this point in the history
This change switches the implementation of the ColumnPredicate to use the
BlockBloomFilter for the BloomFilter predicate on the server side.

Earlier implementation was still experimental and didn't provide public client
APIs that actually use this BloomFilter predicate so taken the liberty to make
incompatible wire protocol changes.

Updated BlockBloomFilter to take hash_algorithm and hash_seed.
This make serializing and deserializing the BlockBloomFilter convenient and
removes the need of BloomFilterInner in ColumnPredicate.
Added overloaded Insert()/Find() functions to BlockBloomFilter that take Slice
parameter and hashes the key before insertion/lookup.

Most of the change involves refactoring the implementation including the
unit tests.

Currently only FAST_HASH algorithm is supported since 32-bit versions of
MURMUR2 and CITY_HASH are not currently implemented.

Change-Id: I7ecfd67e9c5fbe459c5b4aed91e0be2a194d433a
Reviewed-on: http://gerrit.cloudera.org:8080/15034
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Adar Dembo <[email protected]>
Reviewed-by: helifu <[email protected]>
  • Loading branch information
bbhavsar authored and alexeyserbin committed Feb 19, 2020
1 parent ba7c64f commit 961888d
Show file tree
Hide file tree
Showing 15 changed files with 491 additions and 376 deletions.
164 changes: 72 additions & 92 deletions src/kudu/common/column_predicate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <initializer_list>
#include <string>
#include <utility>
#include <vector>

#include <boost/optional/optional.hpp>
Expand All @@ -37,13 +39,15 @@
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/bloom_filter.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/hash_util.h"
#include "kudu/util/int128.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

using std::vector;
Expand Down Expand Up @@ -82,8 +86,8 @@ class TestColumnPredicate : public KuduTest {

void FillBloomFilterAndValues(int n_keys,
vector<uint64_t>* values,
BloomFilterBuilder* bfb1,
BloomFilterBuilder* bfb2) {
BlockBloomFilter* b1,
BlockBloomFilter* b2) {
uint64_t current = 0;
for (int i = 0; i < 2; ++i) {
while (true) {
Expand All @@ -93,9 +97,9 @@ class TestColumnPredicate : public KuduTest {
}
current = key;
Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
bfb1->AddKey(probe);
bfb2->AddKey(probe);
uint32_t hash_val = HashUtil::ComputeHash32(key_slice, FAST_HASH, 0);
b1->Insert(hash_val);
b2->Insert(hash_val);
values->emplace_back(key);
break;
}
Expand All @@ -104,12 +108,11 @@ class TestColumnPredicate : public KuduTest {
while (true) {
uint64_t key = rand_.Next();
Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
BloomFilter bf(bfb1->slice(), bfb1->n_hashes());
if (!bf.MayContainKey(probe) && key > current) {
uint32_t hash_val = HashUtil::ComputeHash32(key_slice, FAST_HASH, 0);
if (!b1->Find(hash_val) && key > current) {
current = key;
values->emplace_back(key);
bfb2->AddKey(probe);
b2->Insert(hash_val);
break;
}
}
Expand Down Expand Up @@ -795,9 +798,8 @@ class TestColumnPredicate : public KuduTest {

template <typename T>
void TestMergeBloomFilterCombinations(const ColumnSchema& column,
vector<ColumnPredicate::BloomFilterInner>* bf,
const vector<BlockBloomFilter*>& bf,
vector<T> values) {
vector<ColumnPredicate::BloomFilterInner> orig_bloom_filters = *bf;
// BloomFilter AND
// NONE
// =
Expand All @@ -811,7 +813,6 @@ class TestColumnPredicate : public KuduTest {
// Equality
// =
// Equality
*bf = orig_bloom_filters;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::Equality(column, &values[0]),
ColumnPredicate::Equality(column, &values[0]),
Expand All @@ -821,8 +822,8 @@ class TestColumnPredicate : public KuduTest {
// Equality
// =
// None
*bf = orig_bloom_filters;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
// Value in index 2 is not present in one of the bloom filters.
ColumnPredicate::Equality(column, &values[2]),
ColumnPredicate::None(column),
PredicateType::None);
Expand All @@ -831,18 +832,15 @@ class TestColumnPredicate : public KuduTest {
// IS NOT NULL
// =
// BloomFilter
*bf = orig_bloom_filters;
vector<ColumnPredicate::BloomFilterInner> bf_copy = *bf;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::IsNotNull(column),
ColumnPredicate::InBloomFilter(column, &bf_copy, nullptr, nullptr),
ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
PredicateType::InBloomFilter);

// BloomFilter AND
// IS NULL
// =
// None
*bf = orig_bloom_filters;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::IsNull(column),
ColumnPredicate::None(column),
Expand All @@ -852,9 +850,7 @@ class TestColumnPredicate : public KuduTest {
// InList
// =
// None(the value in list can not hit bloom filter)
*bf = orig_bloom_filters;
vector<const void*> in_list = { &values[2], &values[3], &values[4] };
vector<const void*> hit_list;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InList(column, &in_list),
ColumnPredicate::None(column),
Expand All @@ -865,8 +861,7 @@ class TestColumnPredicate : public KuduTest {
// =
// InList(the value in list all hits bloom filter)
in_list = { &values[0], &values[1] };
hit_list = { &values[0], &values[1] };
*bf = orig_bloom_filters;
vector<const void*> hit_list = { &values[0], &values[1] };
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InList(column, &in_list),
ColumnPredicate::InList(column, &hit_list),
Expand All @@ -878,7 +873,6 @@ class TestColumnPredicate : public KuduTest {
// InList(only the some values in list hits bloom filter)
in_list = { &values[0], &values[1], &values[2], &values[3] };
hit_list = { &values[0], &values[1]};
*bf = orig_bloom_filters;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InList(column, &in_list),
ColumnPredicate::InList(column, &hit_list),
Expand All @@ -889,7 +883,6 @@ class TestColumnPredicate : public KuduTest {
// =
// Equality(only the first value in list hits bloom filter, so it simplify to Equality)
in_list = { &values[0], &values[2], &values[3] };
*bf = orig_bloom_filters;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InList(column, &in_list),
ColumnPredicate::Equality(column, &values[0]),
Expand All @@ -899,30 +892,26 @@ class TestColumnPredicate : public KuduTest {
// BloomFilter
// =
// BloomFilter with lower and upper bound
*bf = orig_bloom_filters;
bf_copy = *bf;
TestMerge(ColumnPredicate::Range(column, &values[0], &values[4]),
ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
ColumnPredicate::InBloomFilter(column, bf, &values[0],
&values[4]),
PredicateType::InBloomFilter);

// BloomFilter with lower and upper bound AND
// Range
// =
// BloomFilter with lower and upper bound
*bf = orig_bloom_filters;
bf_copy = *bf;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[4]),
ColumnPredicate::Range(column, &values[1], &values[3]),
ColumnPredicate::InBloomFilter(column, &bf_copy, &values[1], &values[3]),
ColumnPredicate::InBloomFilter(column, bf, &values[1],
&values[3]),
PredicateType::InBloomFilter);

// BloomFilter with lower and upper bound AND
// Range
// =
// None
*bf = orig_bloom_filters;
bf_copy = *bf;
TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
ColumnPredicate::Range(column, &values[2], &values[4]),
ColumnPredicate::None(column),
Expand All @@ -932,38 +921,35 @@ class TestColumnPredicate : public KuduTest {
// BloomFilter with lower and upper bound
// =
// BloomFilter with lower and upper bound
*bf = orig_bloom_filters;
bf_copy = *bf;
vector<ColumnPredicate::BloomFilterInner> collect = *bf;
collect.insert(collect.end(), bf->begin(), bf->end());
auto collect = bf;
collect.insert(collect.end(), bf.begin(), bf.end());
TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
ColumnPredicate::InBloomFilter(column, &collect, &values[0], &values[4]),
ColumnPredicate::InBloomFilter(column, bf, &values[0],
&values[4]),
ColumnPredicate::InBloomFilter(column, collect, &values[0],
&values[4]),
PredicateType::InBloomFilter);

// BloomFilter with lower and upper bound AND
// BloomFilter with lower and upper bound
// =
// BloomFilter with lower and upper bound
*bf = orig_bloom_filters;
collect = *bf;
bf_copy = *bf;
collect.insert(collect.end(), bf->begin(), bf->end());
collect = bf;
collect.insert(collect.end(), bf.begin(), bf.end());
TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[1], &values[3]),
ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
ColumnPredicate::InBloomFilter(column, &collect, &values[1], &values[3]),
ColumnPredicate::InBloomFilter(column, bf, &values[0],
&values[4]),
ColumnPredicate::InBloomFilter(column, collect, &values[1],
&values[3]),
PredicateType::InBloomFilter);

// BloomFilter with lower and upper bound AND
// BloomFilter with lower and upper bound
// =
// None
*bf = orig_bloom_filters;
collect = *bf;
bf_copy = *bf;
collect.insert(collect.end(), bf->begin(), bf->end());
TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
ColumnPredicate::InBloomFilter(column, &bf_copy, &values[2], &values[4]),
ColumnPredicate::InBloomFilter(column, bf, &values[2],
&values[4]),
ColumnPredicate::None(column),
PredicateType::None);
}
Expand Down Expand Up @@ -1398,73 +1384,67 @@ TEST_F(TestColumnPredicate, TestRedaction) {
}

TEST_F(TestColumnPredicate, TestBloomFilterMerge) {
Arena arena(1024);
ArenaBlockBloomFilterBufferAllocator allocator(&arena);

int n_keys = 5; // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.

// Test for UINT64 type.
BloomFilterBuilder bfb1(
BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
double expected_fp_rate1 = bfb1.false_positive_rate();
ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
ASSERT_EQ(9, bfb1.n_bits() / n_keys);
BloomFilterBuilder bfb2(
BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
double expected_fp_rate2 = bfb2.false_positive_rate();
ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
ASSERT_EQ(9, bfb2.n_bits() / n_keys);
BlockBloomFilter b1(&allocator);
int log_space_bytes1 = BlockBloomFilter::MinLogSpace(n_keys, 0.01);
ASSERT_OK(b1.Init(log_space_bytes1, FAST_HASH, 0));
double expected_fp_rate1 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes1);
ASSERT_LE(expected_fp_rate1, 0.01);

BlockBloomFilter b2(&allocator);
int log_space_bytes2 = BlockBloomFilter::MinLogSpace(n_keys, 0.01);
ASSERT_OK(b2.Init(log_space_bytes2, FAST_HASH, 0));
double expected_fp_rate2 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes2);
ASSERT_LE(expected_fp_rate2, 0.01);

vector<uint64_t> values_int;
FillBloomFilterAndValues(n_keys, &values_int, &bfb1, &bfb2);
const Slice slice1 = bfb1.slice();
const Slice slice2 = bfb2.slice();
ColumnPredicate::BloomFilterInner bf1(slice1, bfb1.n_hashes(), MURMUR_HASH_2);
ColumnPredicate::BloomFilterInner bf2(slice2, bfb2.n_hashes(), MURMUR_HASH_2);
vector<ColumnPredicate::BloomFilterInner> bfs;
bfs.emplace_back(bf1);
TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
bfs.clear();
bfs.emplace_back(bf1);
bfs.emplace_back(bf2);
TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
FillBloomFilterAndValues(n_keys, &values_int, &b1, &b2);
TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), {&b1}, values_int);
TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), {&b1, &b2},
values_int);

// Test for STRING type.
BloomFilterBuilder bfb3(
BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
double expected_fp_rate3 = bfb3.false_positive_rate();
ASSERT_NEAR(expected_fp_rate3, 0.01, 0.002);
ASSERT_EQ(9, bfb3.n_bits() / n_keys);
// 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.
BlockBloomFilter b3(&allocator);
int log_space_bytes3 = BlockBloomFilter::MinLogSpace(n_keys, 0.01);
ASSERT_OK(b3.Init(log_space_bytes3, FAST_HASH, 0));
double expected_fp_rate3 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes3);
ASSERT_LE(expected_fp_rate3, 0.01);

vector<std::string> keys = {"0", "00", "10", "100", "1100"};
vector<Slice> keys_slice;
for (int i = 0; i < keys.size(); ++i) {
Slice key_slice(keys[i]);
BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
if (i < 2) {
bfb3.AddKey(probe);
b3.Insert(key_slice);
}
keys_slice.emplace_back(key_slice);
}
bfs.clear();
bfs.emplace_back(bfb3.slice(), bfb3.n_hashes(), MURMUR_HASH_2);
TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, keys_slice);

TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), {&b3}, keys_slice);

// Test for BINARY type
BloomFilterBuilder bfb4(
BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
double expected_fp_rate4 = bfb4.false_positive_rate();
ASSERT_NEAR(expected_fp_rate4, 0.01, 0.002);
ASSERT_EQ(9, bfb4.n_bits() / n_keys);
BlockBloomFilter b4(&allocator);
int log_space_bytes4 = BlockBloomFilter::MinLogSpace(n_keys, 0.01);
ASSERT_OK(b4.Init(log_space_bytes4, FAST_HASH, 0));
double expected_fp_rate4 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes4);
ASSERT_LE(expected_fp_rate4, 0.01);

vector<Slice> binary_keys = { Slice("", 0),
Slice("\0", 1),
Slice("\0\0", 2),
Slice("\0\0\0", 3),
Slice("\0\0\0\0", 4) };
for (int i = 0; i < binary_keys.size(); ++i) {
BloomKeyProbe probe(binary_keys[i], MURMUR_HASH_2);
if (i < 2) {
bfb4.AddKey(probe);
b4.Insert(binary_keys[i]);
}
}
bfs.clear();
bfs.emplace_back(bfb4.slice(), bfb4.n_hashes(), MURMUR_HASH_2);
TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, binary_keys);
TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), {&b4}, binary_keys);
}

// Test ColumnPredicate operator (in-)equality.
Expand Down
Loading

0 comments on commit 961888d

Please sign in to comment.