Skip to content

Commit

Permalink
[client] Remove allocator param for direct Bloom filter predicate
Browse files Browse the repository at this point in the history
Direct Bloom filter predicate in kudu client API is meant for consumption by
Impala.

Supplying the allocator that's associated with Bloom filter is turning out to
be tricky for Impala. For instance, it doesn't want to support Clone() and
concerned with the allocator being used by kudu client for
allocation/deallocation of memory.

Allocator associated with the Bloom filter is only used for cloning a predicate
and the support for cloning is only used for internal testing purposes.

So this change removes the allocator parameter from the direct Bloom filter
predicate API. For cloning the direct Bloom filter, it uses the default
allocator instead.

Since this API is marked as advanced/unstable, no compatibility concerns.

Change-Id: I7b57a6cacc8a5b6243787df8f896b74aa04f3916
Reviewed-on: http://gerrit.cloudera.org:8080/15947
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
bbhavsar committed May 21, 2020
1 parent 8a45bee commit d652cab
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 40 deletions.
28 changes: 9 additions & 19 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/async_util.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/init.h"
#include "kudu/util/logging.h"
Expand All @@ -100,6 +99,10 @@
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/version_info.h"

namespace kudu {
class BlockBloomFilter;
} // namespace kudu

using kudu::consensus::RaftPeerPB;
using kudu::master::AlterTableRequestPB;
using kudu::master::AlterTableResponsePB;
Expand Down Expand Up @@ -1006,43 +1009,30 @@ KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
}

KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
const Slice& allocator,
const vector<Slice>& bloom_filters) {
// Empty vector of bloom filters will select all rows. Hence disallowed.
if (bloom_filters.empty()) {
return new KuduPredicate(
new ErrorPredicateData(Status::InvalidArgument("No Bloom filters supplied")));
}

// In this case, the Block Bloom filters and allocator are supplied as opaque pointers
// and the predicate will convert them to well-defined pointer types
// but will NOT take ownership of those pointers. Hence a custom deleter,
// DirectBloomFilterDataDeleter, is used that gives control over ownership.

// Extract the allocator.
auto* bbf_allocator =
// const_cast<> can be avoided with mutable_data() on a Slice. However mutable_data() is a
// non-const function which can't be used with the const allocator Slice.
// Same for BlockBloomFilter below.
reinterpret_cast<BlockBloomFilterBufferAllocatorIf*>(const_cast<uint8_t*>(allocator.data()));
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> bbf_allocator_shared(
bbf_allocator,
DirectBloomFilterDataDeleter<BlockBloomFilterBufferAllocatorIf>(false /*owned*/));

// Extract the Block Bloom filters.
vector<DirectBlockBloomFilterUniqPtr> bbf_vec;
for (const auto& bf_slice : bloom_filters) {
auto* bbf =
reinterpret_cast<BlockBloomFilter*>(const_cast<uint8_t*>(bf_slice.data()));
// In this case, the Block Bloom filters are supplied as opaque pointers
// and the predicate will convert them to well-defined pointer types
// but will NOT take ownership of those pointers. Hence a custom deleter,
// DirectBloomFilterDataDeleter, is used that gives control over ownership.
DirectBlockBloomFilterUniqPtr bf_uniq_ptr(
bbf, DirectBloomFilterDataDeleter<BlockBloomFilter>(false /*owned*/));
bbf_vec.emplace_back(std::move(bf_uniq_ptr));
}

return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
return new KuduPredicate(
new InDirectBloomFilterPredicateData(col_schema, std::move(bbf_allocator_shared),
std::move(bbf_vec)));
new InDirectBloomFilterPredicateData(col_schema, std::move(bbf_vec)));
});
}

Expand Down
4 changes: 0 additions & 4 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1137,9 +1137,6 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
///
/// @param [in] col_name
/// Name of the column to which the predicate applies.
/// @param [in] allocator
/// Pointer to the BlockBloomFilterBufferAllocatorIf allocator used with
/// Bloom filters.
/// @param bloom_filters
/// Vector of BlockBloomFilter pointers that contain the values inserted to
/// match against the column. The column value must match against all the
Expand All @@ -1155,7 +1152,6 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
/// a non-NULL value is still returned. The error will be returned when
/// attempting to add this predicate to a KuduScanner.
KuduPredicate* NewInBloomFilterPredicate(const Slice& col_name,
const Slice& allocator,
const std::vector<Slice>& bloom_filters);
///@}

Expand Down
7 changes: 2 additions & 5 deletions src/kudu/client/predicate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1413,15 +1413,12 @@ TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {

InsertAllValuesInTable();

auto* allocator = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
Slice allocator_slice(reinterpret_cast<const uint8_t*>(allocator), sizeof(*allocator));

vector<Slice> included_bf_vec =
{ Slice(reinterpret_cast<const uint8_t*>(included_bf.get()), sizeof(*included_bf)) };
const size_t included_bf_vec_size = included_bf_vec.size();

auto* included_predicate =
table_->NewInBloomFilterPredicate("value", allocator_slice, included_bf_vec);
table_->NewInBloomFilterPredicate("value", included_bf_vec);
auto* included_predicate_clone1 = included_predicate->Clone();
auto* included_predicate_clone2 = included_predicate->Clone();

Expand All @@ -1434,7 +1431,7 @@ TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
{ Slice(reinterpret_cast<const uint8_t*>(excluded_bf.get()), sizeof(*excluded_bf)) };
const size_t excluded_bf_vec_size = excluded_bf_vec.size();
auto* excluded_predicate =
table_->NewInBloomFilterPredicate("value", allocator_slice, excluded_bf_vec);
table_->NewInBloomFilterPredicate("value", excluded_bf_vec);
ASSERT_EQ(excluded_bf_vec_size, excluded_bf_vec.size());

int actual_count_excluded = CountRows(table_, { excluded_predicate });
Expand Down
3 changes: 0 additions & 3 deletions src/kudu/client/scan_predicate-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,8 @@ class InDirectBloomFilterPredicateData : public KuduPredicate::Data {
public:
InDirectBloomFilterPredicateData(
ColumnSchema col,
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator,
std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters)
: col_(std::move(col)),
allocator_(std::move(allocator)),
bloom_filters_(std::move(bloom_filters)) {
}

Expand All @@ -152,7 +150,6 @@ class InDirectBloomFilterPredicateData : public KuduPredicate::Data {
// have worked fine. However for the case when predicate data is Clone()'d the internal data
// is owned by the instance of this class. Hence using smart pointers with custom deleter
// DirectBloomFilterDataDeleter to keep track of ownership.
std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_;
std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters_;
};

Expand Down
16 changes: 7 additions & 9 deletions src/kudu/client/scan_predicate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,23 @@ Status InDirectBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*
}

InDirectBloomFilterPredicateData* InDirectBloomFilterPredicateData::Clone() const {
// In the clone case, the objects are owned by InDirectBloomFilterPredicateData
// and hence use the default deleter with shared_ptr.
shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone =
CHECK_NOTNULL(allocator_->Clone());

vector<DirectBlockBloomFilterUniqPtr> bloom_filter_clones;
bloom_filter_clones.reserve(bloom_filters_.size());

// For the case of direct Bloom filter, we don't have access to the allocator
// so use the default allocator for cloning it.
auto* allocator = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
for (const auto& bf : bloom_filters_) {
unique_ptr<BlockBloomFilter> bf_clone;
CHECK_OK(bf->Clone(allocator_clone.get(), &bf_clone));
CHECK_OK(bf->Clone(allocator, &bf_clone));

// Similarly for unique_ptr, specify that the BlockBloomFilter is owned by
// For unique_ptr, specify that the BlockBloomFilter is owned by
// InDirectBloomFilterPredicateData.
DirectBlockBloomFilterUniqPtr direct_bf_clone(
bf_clone.release(), DirectBloomFilterDataDeleter<BlockBloomFilter>(true /*owned*/));
bloom_filter_clones.emplace_back(std::move(direct_bf_clone));
}
return new InDirectBloomFilterPredicateData(col_, std::move(allocator_clone),
std::move(bloom_filter_clones));
return new InDirectBloomFilterPredicateData(col_, std::move(bloom_filter_clones));
}

KuduBloomFilter::KuduBloomFilter() {
Expand Down

0 comments on commit d652cab

Please sign in to comment.