Skip to content

Commit

Permalink
[perf] KUDU-3140 Heuristics to disable predicate evaluation for Bloom…
Browse files Browse the repository at this point in the history
… filter

Column predicate evaluation can be expensive and ineffective column predicates
can waste CPU. TPCH Q9 exhibits significant regression of 50-96% on enabling
Bloom filter predicates. See KUDU-3140 for details.

Excerpt from TPCH run exhibiting regression:
https://gist.github.com/bbhavsar/943cf8ebbab63f598353efef8f87db32
TPCH Q9 specific info:
https://gist.github.com/bbhavsar/811ccbe0cd144090f82bdabcd801f827

This change adds simple heuristic taken from HDFS scanner in Impala
that basically checks for every 16 blocks and if a predicate has rejected
less than 10% of the rows scanned then disables the predicate.

To match the equivalent number of rows in Kudu, the check is made
every 128 blocks by default.

The stats collection and enforcement is enabled only for disableable
predicate types, Bloom filter for now.

With Bloom filter predicate type, false positives are expected so
client is expected to do further filtering to remove false positives.
Kudu makes the decision to disable the predicate independently and doesn't
inform the client in this change which is okay for Bloom filter given
the rationale above. Client API docs have been updated accordingly.

Added a tablet level metric to track disabled column predicates.

Tests with PS6:
- TPCH no longer reports regression with Q9. With multiple runs,
  the delta are +1.95%, -24.67%, +2.67%, -17.09%, -14.59% with a std dev
  of 17% - 38% to report it neither as improvement nor as regression.
  https://gist.github.com/bbhavsar/0a773359b9225f014d353759a535c5be
- Improvements with other queries reported before this change remain intact.

Change-Id: I10197800a01a1b34c7821ac879caf8d272cab8dd
Reviewed-on: http://gerrit.cloudera.org:8080/16036
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
bbhavsar committed Jun 26, 2020
1 parent 5190073 commit f1905eb
Show file tree
Hide file tree
Showing 17 changed files with 768 additions and 75 deletions.
15 changes: 3 additions & 12 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,6 @@ class ClientTest : public KuduTest {
return resp.tablet_locations(0).tablet_id();
}

void FlushTablet(const string& tablet_id) {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
scoped_refptr<TabletReplica> tablet_replica;
ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->tablet_manager()->LookupTablet(
tablet_id, &tablet_replica));
ASSERT_OK(tablet_replica->tablet()->Flush());
}
}

void CheckNoRpcOverflow() {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
MiniTabletServer* server = cluster_->mini_tablet_server(i);
Expand Down Expand Up @@ -451,7 +442,7 @@ class ClientTest : public KuduTest {
KuduScanner scanner(client_table_.get());
string tablet_id = GetFirstTabletId(client_table_.get());
// Flush to ensure we scan disk later
FlushTablet(tablet_id);
ASSERT_OK(cluster_->FlushTablet(tablet_id));
ASSERT_OK(scanner.SetProjectedColumnNames({ "key" }));
LOG_TIMING(INFO, "Scanning disk with no predicates") {
ASSERT_OK(scanner.Open());
Expand Down Expand Up @@ -1551,7 +1542,7 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
// disk.
if (with_flush) {
string tablet_id = GetFirstTabletId(table.get());
FlushTablet(tablet_id);
ASSERT_OK(cluster_->FlushTablet(tablet_id));
}

// Test a few different recoverable server-side error conditions.
Expand Down Expand Up @@ -6619,7 +6610,7 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
// Leave one row in the tablet's MRS so that the scan includes one rowset
// without bounds. This affects the behavior of FT scans.
if (i < kNumRows - 1 && rng.OneIn(2)) {
NO_FATALS(FlushTablet(GetFirstTabletId(table.get())));
ASSERT_OK(cluster_->FlushTablet(GetFirstTabletId(table.get())));
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,13 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
/// hand with IN Bloom filter predicate large number of values can be tested
/// for membership in a space-efficient manner.
///
/// IN Bloom filter predicate may be automatically disabled if determined to
/// be ineffective in filtering rows during scan requests.
///
/// Users are expected to perform further filtering to guard against false
/// positives and automatic disablement of an ineffective Bloom filter
/// predicate to get precise set membership information.
///
/// @param [in] col_name
/// Name of the column to which the predicate applies.
/// @param [in] bloom_filters
Expand Down Expand Up @@ -1135,6 +1142,13 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
/// hand with IN Bloom filter predicate large number of values can be tested
/// for membership in a space-efficient manner.
///
/// IN Bloom filter predicate may be automatically disabled if determined to
/// be ineffective in filtering rows during scan requests.
///
/// Users are expected to perform further filtering to guard against false
/// positives and automatic disablement of an ineffective Bloom filter
/// predicate to get precise set membership information.
///
/// @param [in] col_name
/// Name of the column to which the predicate applies.
/// @param bloom_filters
Expand Down
179 changes: 156 additions & 23 deletions src/kudu/client/predicate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <initializer_list>
#include <iterator>
#include <limits>
Expand All @@ -30,6 +31,7 @@
#include <utility>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand All @@ -44,11 +46,15 @@
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/int128.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/slice.h"
Expand All @@ -57,7 +63,12 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DECLARE_int32(predicate_effectivess_num_skip_blocks);
METRIC_DECLARE_counter(scanner_predicates_disabled);
METRIC_DECLARE_entity(tablet);

using std::count_if;
using std::deque;
using std::numeric_limits;
using std::string;
using std::unique_ptr;
Expand Down Expand Up @@ -341,6 +352,27 @@ class PredicateTest : public KuduTest {
return bf;
}

template<typename BloomFilterType, typename Collection>
static void InsertValuesInBloomFilter(BloomFilterType* bloom_filter, const Collection& values) {
for (const auto& v : values) {
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
}
}

template<class Collection>
static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
InsertValuesInBloomFilter(bloom_filter, values);
return bloom_filter;
}

template<class Collection>
static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
InsertValuesInBloomFilter(bloom_filter.get(), values);
return bloom_filter;
}

void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
KuduBloomFilter* in_bloom_filter,
int expected_count) {
Expand Down Expand Up @@ -584,6 +616,30 @@ class PredicateTest : public KuduTest {
ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
}

string GetTheOnlyTabletId() {
// Cluster is setup with single tablet server and a single table.
CHECK_EQ(1, cluster_->num_tablet_servers());
const auto* mini_tserver = cluster_->mini_tablet_server(0);
const auto& tablets = mini_tserver->ListTablets();
CHECK_EQ(1, tablets.size());
return tablets[0];
}

// Function to verify ineffective disabled predicates on specified tablet.
void CheckDisabledPredicates(const string& tablet_id, int64_t expected_disabled_predicates) {
// Cluster is setup with single tablet server and a single table.
ASSERT_EQ(1, cluster_->num_tablet_servers());
const auto* mini_tserver = cluster_->mini_tablet_server(0);
int64_t predicates_disabled = 0;
ASSERT_OK(itest::GetInt64Metric(HostPort(mini_tserver->bound_http_addr()),
&METRIC_ENTITY_tablet,
tablet_id.c_str(),
&METRIC_scanner_predicates_disabled,
"value",
&predicates_disabled));
ASSERT_EQ(expected_disabled_predicates, predicates_disabled);
}

shared_ptr<KuduClient> client_;
unique_ptr<InternalMiniCluster> cluster_;
};
Expand Down Expand Up @@ -1295,8 +1351,7 @@ class BloomFilterPredicateTest : public PredicateTest {
// one with 'num_included_values' values from the table, and one with 'num_excluded_values'
// values that aren't present in the table.
void Init(int num_all_values, int num_included_values, int num_excluded_values) {
ASSERT_LT(num_included_values, num_all_values);
ASSERT_LT(num_excluded_values, num_all_values);
ASSERT_LE(num_included_values, num_all_values);

table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
session_ = CreateSession();
Expand All @@ -1313,27 +1368,6 @@ class BloomFilterPredicateTest : public PredicateTest {
num_false_positive_values_ = num_all_values * kBloomFilterFalsePositiveProb;
}

template<typename BloomFilterType, typename Collection>
static void InsertValues(BloomFilterType* bloom_filter, const Collection& values) {
for (const auto& v : values) {
bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
}
}

template<class Collection>
static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
InsertValues(bloom_filter, values);
return bloom_filter;
}

template<class Collection>
static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
InsertValues(bloom_filter.get(), values);
return bloom_filter;
}

void InsertAllValuesInTable() {
int i = 0;
for (auto value : all_values_) {
Expand Down Expand Up @@ -1442,6 +1476,28 @@ TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
}

// Test to verify that an ineffective Bloom filter predicate will be disabled.
TEST_F(BloomFilterPredicateTest, TestDisabledBloomFilterPredicate) {
// Insert all values from the table in Bloom filter so that the predicate
// is determined to be ineffective.
Init(10000/*num_all_values*/, 10000/*num_included_values*/, 0/*num_excluded_values*/);
KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);

InsertAllValuesInTable();

vector<KuduBloomFilter*> included_bf_vec = { included_bf };
auto* included_predicate =
table_->NewInBloomFilterPredicate("value", &included_bf_vec);

ASSERT_TRUE(included_bf_vec.empty());
FLAGS_predicate_effectivess_num_skip_blocks = 4;
int actual_count_included = CountRows(table_, { included_predicate });
ASSERT_EQ(included_values_.size(), actual_count_included);

// CountRows() runs 2 scans using a cloned predicate.
CheckDisabledPredicates(GetTheOnlyTabletId(), 2 /* expected_disabled_predicates */);
}

// Benchmark test that combines Bloom filter predicate with range predicate.
TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
SKIP_IF_SLOW_NOT_ALLOWED();
Expand All @@ -1459,6 +1515,83 @@ TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
TestWithRangePredicate(included_predicate, included_predicate_clone);
}

class ParameterizedBloomFilterPredicateTest :
public PredicateTest,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {};

INSTANTIATE_TEST_CASE_P(, ParameterizedBloomFilterPredicateTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));

// Test to verify that an ineffective Bloom filter predicate will be disabled
// using a pattern of repeated strings.
TEST_P(ParameterizedBloomFilterPredicateTest, TestDisabledBloomFilterWithRepeatedStrings) {
// Combination of following 2 flags help test cases with MRSs flushed, DRS and delta files.
const bool flush_tablet = std::get<0>(GetParam());
const bool update_rows = std::get<1>(GetParam());

shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::STRING);
shared_ptr<KuduSession> session = CreateSession();

// Use a set of predetermined strings and populate the table.
// Create a BF predicate with same set of strings.
deque<string> values = {"Alice", "Bob", "Charlie", "Doug", "Elizabeth", "Frank", "George",
"Harry"};

// Populate table with a small set of strings that are repeated.
static constexpr int kNumRows = 10000;
auto upsert_rows = [&]() {
int i = 0;
while (i < kNumRows) {
for (int j = 0; j < values.size() && i < kNumRows; j++, i++) {
const string &value = values[j];
unique_ptr<KuduUpsert> upsert(table->NewUpsert());
ASSERT_OK(upsert->mutable_row()->SetInt64("key", i));
ASSERT_OK(upsert->mutable_row()->SetStringNoCopy("value", value));
ASSERT_OK(session->Apply(upsert.release()));
}
// TSAN builds timeout and fail on flushing with large number of rows.
if ((i % (kNumRows / 10)) == 0) {
ASSERT_OK(session->Flush());
}
}
ASSERT_OK(session->Flush());
};

upsert_rows();
const string tablet_id = GetTheOnlyTabletId();

if (flush_tablet) {
ASSERT_OK(cluster_->FlushTablet(tablet_id));
}

if (update_rows) {
string first_name = values.front();
values.pop_front();
values.push_back(first_name);

upsert_rows();
if (flush_tablet) {
ASSERT_OK(cluster_->FlushTablet(tablet_id));
}
}

// Create Bloom filter predicate with string values.
auto* bf = CreateBloomFilter(values.size());
for (const auto& value : values) {
bf->Insert(Slice(value));
}
vector<KuduBloomFilter*> bf_vec = { bf };
auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
ASSERT_TRUE(bf_vec.empty());

FLAGS_predicate_effectivess_num_skip_blocks = 4;
int actual_count_included = DoCountRows(table, { bf_predicate });
ASSERT_EQ(kNumRows, actual_count_included);

CheckDisabledPredicates(tablet_id, 1 /* expected_disabled_predicates */);
}

class ParameterizedPredicateTest : public PredicateTest,
public ::testing::WithParamInterface<KuduColumnSchema::DataType> {};

Expand Down
1 change: 1 addition & 0 deletions src/kudu/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(COMMON_SRCS
partial_row.cc
partition.cc
partition_pruner.cc
predicate_effectiveness.cc
rowblock.cc
row_changelist.cc
row_operations.cc
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/common/column_materialization_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class ColumnMaterializationContext {
// correct status will be set.
kNotSet = 0,

// May be set before scanning if the decoder eval flag is set to false or
// if iterator has deltas associated with it.
// May be set before scanning if the decoder eval flag is set to false
// or if iterator has deltas associated with it or if the predicate is
// determined to be ineffective.
// May be set by decoder during scan if decoder eval is not supported.
// Once set, scanning will materialize the entire column into the block,
// leaving evaluation for after the scan.
Expand Down
Loading

0 comments on commit f1905eb

Please sign in to comment.