Skip to content

Commit

Permalink
KUDU-2980: fix ORDERED scans when key columns are misordered in proje…
Browse files Browse the repository at this point in the history
…ction

This patch fixes KUDU-2980 by taking a stricter approach to how key columns
appear in ORDERED scan projections. Any key columns already in the
client-provided projection are completely ignored. Instead, when we rebuild
the projection server-side, the first section (i.e. the part with the key
columns) is built using the tablet's schema. After that, we incorporate any
remaining columns from the client-provided projection, dropping any key
columns. Finally, we add any columns necessary to satisfy predicates.

The new fuzz test reproduces the bug very easily, and is expansive enough to
provide additional coverage for funky projection and predicate ordering. I
looped it 1000 times in dist-test without failure.

Change-Id: Idb213f466c7d01b6953a863d8aa53a34b5ac8893
Reviewed-on: http://gerrit.cloudera.org:8080/14503
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
(cherry picked from commit e23e52a)
Reviewed-on: http://gerrit.cloudera.org:8080/14537
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Adar Dembo <[email protected]>
  • Loading branch information
adembo committed Oct 23, 2019
1 parent 3fa5b11 commit 08db97c
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 32 deletions.
126 changes: 126 additions & 0 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <set>
#include <string>
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -75,7 +76,9 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/data_gen_util.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
Expand Down Expand Up @@ -104,6 +107,8 @@
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/semaphore.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
Expand Down Expand Up @@ -177,6 +182,7 @@ using std::set;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;

Expand Down Expand Up @@ -6376,5 +6382,125 @@ TEST_F(ClientWithLocationTest, LocationCacheMetricsOnClientConnectToCluster) {
ASSERT_EQ(hits_before + kIterNum, hits_after);
}

// Regression test for KUDU-2980.
TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
const int kNumColumns = 20;
const int kNumPKColumns = kNumColumns / 2;
const char* const kTableName = "test";

// Create a schema with half of the columns in the primary key.
//
// Use a smattering of different physical data types to increase the
// likelihood of a size transition between primary key components.
KuduSchemaBuilder b;
vector<string> pk_col_names;
vector<string> all_col_names;
KuduColumnSchema::DataType data_type = KuduColumnSchema::STRING;
for (int i = 0; i < kNumColumns; i++) {
string col_name = std::to_string(i);
b.AddColumn(col_name)->Type(data_type)->NotNull();
if (i < kNumPKColumns) {
pk_col_names.emplace_back(col_name);
}
all_col_names.emplace_back(std::move(col_name));

// Rotate to next data type.
switch (data_type) {
case KuduColumnSchema::INT8: data_type = KuduColumnSchema::INT16; break;
case KuduColumnSchema::INT16: data_type = KuduColumnSchema::INT32; break;
case KuduColumnSchema::INT32: data_type = KuduColumnSchema::INT64; break;
case KuduColumnSchema::INT64: data_type = KuduColumnSchema::STRING; break;
case KuduColumnSchema::STRING: data_type = KuduColumnSchema::INT8; break;
default: LOG(FATAL) << "Unexpected data type " << data_type;
}
}
b.SetPrimaryKey(pk_col_names);
KuduSchema schema;
ASSERT_OK(b.Build(&schema));

// Create and open the table. We don't care about replication or partitioning.
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema)
.set_range_partition_columns({})
.num_replicas(1)
.Create());
shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTableName, &table));

unique_ptr<KuduScanner> scanner;
scanner.reset(new KuduScanner(table.get()));

// Pick a random selection of columns to project.
//
// Done before inserting data so the projection can be used to determine the
// expected scan results.
Random rng(SeedRandom());
vector<string> projected_col_names =
SelectRandomSubset<vector<string>, string, Random>(all_col_names, 0, &rng);
std::random_shuffle(projected_col_names.begin(), projected_col_names.end());
ASSERT_OK(scanner->SetProjectedColumnNames(projected_col_names));

// Insert some rows with randomized keys, flushing the tablet periodically.
const int kNumRows = 20;
shared_ptr<KuduSession> session = client_->NewSession();
vector<string> expected_rows;
for (int i = 0; i < kNumRows; i++) {
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
GenerateDataForRow(schema, &rng, row);

// Store a copy of the row for later, to compare with the scan results.
//
// The copy should look like KuduScanBatch::RowPtr::ToString() and must
// conform to the projection schema.
ConstContiguousRow ccr(row->schema(), row->row_data_);
string row_str = "(";
row_str += JoinMapped(projected_col_names, [&](const string& col_name) {
int col_idx = row->schema()->find_column(col_name);
const auto& col = row->schema()->column(col_idx);
DCHECK_NE(Schema::kColumnNotFound, col_idx);
string cell;
col.DebugCellAppend(ccr.cell(col_idx), &cell);
return cell;
}, ", ");
row_str += ")";
expected_rows.emplace_back(std::move(row_str));

ASSERT_OK(session->Apply(insert.release()));

// Leave one row in the tablet's MRS so that the scan includes one rowset
// without bounds. This affects the behavior of FT scans.
if (i < kNumRows - 1 && rng.OneIn(2)) {
NO_FATALS(FlushTablet(GetFirstTabletId(table.get())));
}
}

// Pick a random selection of columns for predicates.
//
// We use NOT NULL predicates so as to tickle the server-side code for dealing
// with predicates without actually affecting the scan results.
vector<string> predicate_col_names =
SelectRandomSubset<vector<string>, string, Random>(all_col_names, 0, &rng);
for (const auto& col_name : predicate_col_names) {
unique_ptr<KuduPredicate> pred(table->NewIsNotNullPredicate(col_name));
ASSERT_OK(scanner->AddConjunctPredicate(pred.release()));
}

// Use a fault tolerant scan half the time.
if (rng.OneIn(2)) {
ASSERT_OK(scanner->SetFaultTolerant());
}

// Perform the scan and verify the results.
//
// We ignore result ordering because although FT scans will produce rows
// sorted by primary keys, regular scans will not.
vector<string> rows;
ASSERT_OK(ScanToStrings(scanner.get(), &rows));
ASSERT_EQ(unordered_set<string>(expected_rows.begin(), expected_rows.end()),
unordered_set<string>(rows.begin(), rows.end())) << rows;
}

} // namespace client
} // namespace kudu
15 changes: 8 additions & 7 deletions src/kudu/common/generic_iterators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <iterator>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -1167,12 +1168,13 @@ MaterializingIterator::MaterializingIterator(unique_ptr<ColumnwiseIterator> iter
Status MaterializingIterator::Init(ScanSpec *spec) {
RETURN_NOT_OK(iter_->Init(spec));

int32_t num_columns = schema().num_columns();
const int32_t num_columns = schema().num_columns();
col_idx_predicates_.clear();
non_predicate_column_indexes_.clear();

if (spec != nullptr && !disallow_pushdown_for_tests_) {
if (PREDICT_FALSE(!disallow_pushdown_for_tests_) && spec != nullptr) {
col_idx_predicates_.reserve(spec->predicates().size());
DCHECK_GE(num_columns, spec->predicates().size());
non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size());

for (const auto& col_pred : spec->predicates()) {
Expand All @@ -1185,7 +1187,7 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
col_idx_predicates_.emplace_back(col_idx, col_pred.second);
}

for (int32_t col_idx = 0; col_idx < schema().num_columns(); col_idx++) {
for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
if (!ContainsKey(spec->predicates(), schema().column(col_idx).name())) {
non_predicate_column_indexes_.emplace_back(col_idx);
}
Expand All @@ -1195,10 +1197,9 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
// scan spec so higher layers don't repeat our work.
spec->RemovePredicates();
} else {
non_predicate_column_indexes_.reserve(num_columns);
for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
non_predicate_column_indexes_.emplace_back(col_idx);
}
non_predicate_column_indexes_.resize(num_columns);
std::iota(non_predicate_column_indexes_.begin(),
non_predicate_column_indexes_.end(), 0);
}

// Sort the predicates by selectivity so that the most selective are evaluated
Expand Down
2 changes: 2 additions & 0 deletions src/kudu/common/partial_row.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
namespace kudu {
class ColumnSchema;
namespace client {
class ClientTest_TestProjectionPredicatesFuzz_Test;
class KuduWriteOperation;
template<typename KeyTypeWrapper> struct SliceKeysTestSetup;// IWYU pragma: keep
template<typename KeyTypeWrapper> struct IntKeysTestSetup; // IWYU pragma: keep
Expand Down Expand Up @@ -510,6 +511,7 @@ class KUDU_EXPORT KuduPartialRow {
template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
template<typename KeyTypeWrapper> friend struct tablet::SliceTypeRowOps;
template<typename KeyTypeWrapper> friend struct tablet::NumTypeRowOps;
FRIEND_TEST(client::ClientTest, TestProjectionPredicatesFuzz);
FRIEND_TEST(KeyUtilTest, TestIncrementInt128PrimaryKey);
FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
Expand Down
12 changes: 12 additions & 0 deletions src/kudu/integration-tests/data_gen_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,24 @@ void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
}
}

template <class RNG>
void GenerateDataForRow(const client::KuduSchema& schema,
RNG* random, KuduPartialRow* row) {
GenerateDataForRow(schema, random->Next64(), random, row);
}

// Explicit specialization for callers outside this compilation unit.
template
void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
Random* random, KuduPartialRow* row);
template
void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
ThreadSafeRandom* random, KuduPartialRow* row);
template
void GenerateDataForRow(const client::KuduSchema& schema,
Random* random, KuduPartialRow* row);
template
void GenerateDataForRow(const client::KuduSchema& schema,
ThreadSafeRandom* random, KuduPartialRow* row);

} // namespace kudu
5 changes: 5 additions & 0 deletions src/kudu/integration-tests/data_gen_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ template<class RNG>
void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
RNG* random, KuduPartialRow* row);

// Like the above but randomly generates the entire key.
template<class RNG>
void GenerateDataForRow(const client::KuduSchema& schema,
RNG* random, KuduPartialRow* row);

} // namespace kudu
64 changes: 40 additions & 24 deletions src/kudu/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/replica_management.pb.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
Expand Down Expand Up @@ -2177,19 +2178,6 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
}
}

// When doing an ordered scan, we need to include the key columns to be able to encode
// the last row key for the scan response.
if (scan_pb.order_mode() == kudu::ORDERED &&
projection.num_key_columns() != tablet_schema.num_key_columns()) {
for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
const ColumnSchema &col = tablet_schema.column(i);
if (projection.find_column(col.name()) == -1 &&
!ContainsKey(missing_col_names, col.name())) {
missing_cols->push_back(col);
InsertOrDie(&missing_col_names, col.name());
}
}
}
// Then any encoded key range predicates.
RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, ret.get()));

Expand Down Expand Up @@ -2308,9 +2296,9 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,

gscoped_ptr<ScanSpec> spec(new ScanSpec);

// Missing columns will contain the columns that are not mentioned in the client
// projection but are actually needed for the scan, such as columns referred to by
// predicates or key columns (if this is an ORDERED scan).
// Missing columns will contain the columns that are not mentioned in the
// client projection but are actually needed for the scan, such as columns
// referred to by predicates.
vector<ColumnSchema> missing_cols;
s = SetupScanSpec(scan_pb, tablet_schema, projection, &missing_cols, &spec, scanner);
if (PREDICT_FALSE(!s.ok())) {
Expand All @@ -2332,15 +2320,43 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
gscoped_ptr<Schema> orig_projection(new Schema(projection));
scanner->set_client_projection_schema(std::move(orig_projection));

// Build a new projection with the projection columns and the missing columns. Make
// sure to set whether the column is a key column appropriately.
// Build a new projection with the projection columns and the missing columns,
// annotating each column as a key column appropriately.
//
// Note: the projection is a consistent schema (i.e. no duplicate columns).
// However, it has some different semantics as compared to the tablet schema:
// - It might not contain all of the columns in the tablet.
// - It might contain extra columns not found in the tablet schema. Virtual
// columns are permitted, while others will cause the scan to fail later,
// when the tablet validates the projection.
// - It doesn't know which of its columns are key columns. That's fine for
// an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which
// requires all key columns in tablet schema order.
SchemaBuilder projection_builder;
vector<ColumnSchema> projection_columns = projection.columns();
for (const ColumnSchema& col : missing_cols) {
projection_columns.push_back(col);
}
for (const ColumnSchema& col : projection_columns) {
CHECK_OK(projection_builder.AddColumn(col, tablet_schema.is_key_column(col.name())));
if (scan_pb.order_mode() == ORDERED) {
for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
const auto& col = tablet_schema.column(i);
// CHECK_OK is safe because the tablet schema has no duplicate columns.
CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true));
}
for (int i = 0; i < projection.num_columns(); i++) {
const auto& col = projection.column(i);
// Any key columns in the projection will be ignored.
ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
}
for (const ColumnSchema& col : missing_cols) {
// Any key columns in 'missing_cols' will be ignored.
ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
}
} else {
projection_builder.Reset(projection);
for (const ColumnSchema& col : missing_cols) {
// CHECK_OK is safe because the builder's columns (from the projection)
// and the missing columns are disjoint sets.
//
// UNORDERED scans don't need to know which columns are part of the key.
CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
}
}
projection = projection_builder.BuildWithoutIds();

Expand Down
3 changes: 2 additions & 1 deletion src/kudu/util/random_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ T SelectRandomElement(const Container& c, Rand* r) {
return rand_list[0];
}

// Returns a randomly-selected subset from the container.
// Returns a randomly-selected subset from the container. The subset will
// include at least 'min_to_return' results, but may contain more.
//
// The results are not stored in a randomized order: the order of results will
// match their order in the input collection.
Expand Down

0 comments on commit 08db97c

Please sign in to comment.