Skip to content

Commit

Permalink
KUDU-3384: Handling IncrementEncodedKey failure
Browse files Browse the repository at this point in the history
While using lower/upper of a DRS to optimize scan at DRS level, we
should handle the cases where a upper bound key cannot be incremented.
This patch fixes it, enables the reproduction scenario and also adds
a regression test.

Change-Id: I22c48f9893364c3fad5597431e4acfbcb2a4b43d
Reviewed-on: http://gerrit.cloudera.org:8080/18761
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
zhangyifan27 authored and alexeyserbin committed Jul 21, 2022
1 parent 7618d34 commit 1bde51f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
4 changes: 1 addition & 3 deletions src/kudu/client/flex_partitioning_client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1970,9 +1970,7 @@ class FlexPartitioningScanTest : public FlexPartitioningTest {
};

// This scenario is to reproduce the issue described in KUDU-3384.
//
// TODO(aserbin): enable the scenario once KUDU-3384 is fixed
TEST_F(FlexPartitioningScanTest, DISABLED_MaxKeyValue) {
TEST_F(FlexPartitioningScanTest, MaxKeyValue) {
static constexpr const char* const kTableName = "max_key_value";

unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
Expand Down
55 changes: 54 additions & 1 deletion src/kudu/tablet/cfile_set-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TestCFileSet : public KuduRowSetTest {
FLAGS_cfile_default_block_size = 512;
}

// Write out a test rowset with two int columns.
// Write out a test rowset with three int columns.
// The first column contains the row index * 2.
// The second contains the row index * 10.
// The third column contains index * 100, but is never read.
Expand All @@ -121,6 +121,30 @@ class TestCFileSet : public KuduRowSetTest {
ASSERT_OK(rsw.Finish());
}

void WriteTestRowSetWithMaxValue(int nrows) {
DiskRowSetWriter rsw(
rowset_meta_.get(), &schema_, BloomFilterSizing::BySizeAndFPRate(32 * 1024, 0.01F));

ASSERT_OK(rsw.Open());

RowBuilder rb(&schema_);
int i = INT32_MAX - nrows + 1;
while (i != INT32_MAX) {
rb.Reset();
rb.AddInt32(i);
rb.AddInt32(i);
rb.AddInt32(i);
ASSERT_OK_FAST(WriteRow(rb.data(), &rsw));
i++;
}
rb.Reset();
rb.AddInt32(i);
rb.AddInt32(i);
rb.AddInt32(i);
ASSERT_OK_FAST(WriteRow(rb.data(), &rsw));
ASSERT_OK(rsw.Finish());
}

// Int32 type add probe to the bloom filter.
// bf1_contain: 0 2 4 6 8 ... (2n)th key for column 1 to form bloom filter.
// bf1_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 1 to form bloom filter.
Expand Down Expand Up @@ -628,6 +652,35 @@ TEST_F(TestCFileSet, TestInListPredicates) {
DoTestInListScan(fileset, 1000, 10);
}

// Regression test for KUDU-3384
TEST_F(TestCFileSet, TestKUDU3384) {
constexpr int kNumRows = 10;
WriteTestRowSetWithMaxValue(kNumRows);

shared_ptr<CFileSet> fileset;
ASSERT_OK(CFileSet::Open(
rowset_meta_, MemTracker::GetRootTracker(), MemTracker::GetRootTracker(), nullptr, &fileset));

// Create iterator.
unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr));
unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter)));

// Check a full scan is successful.
ScanSpec spec;
ASSERT_OK(iter->Init(&spec));
RowBlockMemory mem(1024);
RowBlock block(&schema_, 100, &mem);
int selected_size = 0;
while (iter->HasNext()) {
mem.Reset();
ASSERT_OK_FAST(iter->NextBlock(&block));
selected_size += block.selection_vector()->CountSelected();
}
ASSERT_EQ(kNumRows, selected_size);
// Check a range scan is successful.
DoTestRangeScan(fileset, INT32_MAX - kNumRows, INT32_MAX);
}

class InListPredicateBenchmark : public KuduRowSetTest {
public:
InListPredicateBenchmark()
Expand Down
8 changes: 6 additions & 2 deletions src/kudu/tablet/cfile_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,12 @@ Status CFileSet::Iterator::OptimizePKPredicates(ScanSpec* spec) {

RETURN_NOT_OK(EncodedKey::DecodeEncodedString(
tablet_schema, &arena_, base_data_->max_encoded_key_, &implicit_ub_key));
RETURN_NOT_OK(EncodedKey::IncrementEncodedKey(tablet_schema, &implicit_ub_key, &arena_));
if (!ub_key || ub_key->encoded_key() > implicit_ub_key->encoded_key()) {
Status s = EncodedKey::IncrementEncodedKey(tablet_schema, &implicit_ub_key, &arena_);
// Reset the exclusive_upper_bound_key only when we can get a valid and smaller upper bound key.
// In the case IncrementEncodedKey return ERROR status due to allocation fails or no
// lexicographically greater key exists, we fall back to scan the rowset without optimizing the
// upper bound PK, we may scan more rows but we will still get the right result.
if (s.ok() && (!ub_key || ub_key->encoded_key() > implicit_ub_key->encoded_key())) {
spec->SetExclusiveUpperBoundKey(implicit_ub_key);
modify_upper_bound_key = true;
}
Expand Down

0 comments on commit 1bde51f

Please sign in to comment.