Skip to content

Commit

Permalink
[c++-client] clear non-covered entries from meta cache on table open
Browse files Browse the repository at this point in the history
Clearing non-covered range entries from the meta cache on table open
makes it easier to share client instances among different contexts
without having to worry about polluting the non-covering range cache.

Change-Id: I195eac8caa5efca9ad95284ab707c38340ba47f0
Reviewed-on: http://gerrit.cloudera.org:8080/6713
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
danburkert committed Apr 28, 2017
1 parent b36b84d commit fdc022f
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,71 @@ TEST_F(ClientTest, TestNonCoveringRangePartitions) {
}
}

// Test that OpenTable calls clear cached non-covered range partitions.
TEST_F(ClientTest, TestOpenTableClearsNonCoveringRangePartitions) {
// Create a table with a non-covered range.
const string kTableName = "TestNonCoveringRangePartitions";
shared_ptr<KuduTable> table;

vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> bounds;
unique_ptr<KuduPartialRow> lower_bound(schema_.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema_.NewRow());
ASSERT_OK(lower_bound->SetInt32("key", 0));
ASSERT_OK(upper_bound->SetInt32("key", 1));
bounds.emplace_back(std::move(lower_bound), std::move(upper_bound));

CreateTable(kTableName, 1, {}, std::move(bounds), &table);

// Attempt to insert into the non-covered range, priming the meta cache.
shared_ptr<KuduSession> session = client_->NewSession();
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
ASSERT_TRUE(session->Apply(BuildTestRow(table.get(), 1).release()).IsIOError());
{
vector<KuduError*> errors;
ElementDeleter drop(&errors);
bool overflowed;
session->GetPendingErrors(&errors, &overflowed);
EXPECT_FALSE(overflowed);
ASSERT_EQ(1, errors.size());
EXPECT_TRUE(errors[0]->status().IsNotFound());
}

// Using a separate client, fill in the non-covered range. A separate client
// is necessary because altering the range partitioning will automatically
// clear the meta cache, and we want to avoid that.

lower_bound.reset(schema_.NewRow());
upper_bound.reset(schema_.NewRow());
ASSERT_OK(lower_bound->SetInt32("key", 1));

shared_ptr<KuduClient> alter_client;
ASSERT_OK(KuduClientBuilder()
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
.Build(&alter_client));

unique_ptr<KuduTableAlterer> alterer(alter_client->NewTableAlterer(kTableName));
ASSERT_OK(alterer->AddRangePartition(lower_bound.release(), upper_bound.release())
->Alter());

// Attempt to insert again into the non-covered range. It should still fail,
// because the meta cache still contains the non-covered entry.
ASSERT_TRUE(session->Apply(BuildTestRow(table.get(), 1).release()).IsIOError());
{
vector<KuduError*> errors;
ElementDeleter drop(&errors);
bool overflowed;
session->GetPendingErrors(&errors, &overflowed);
EXPECT_FALSE(overflowed);
ASSERT_EQ(1, errors.size());
EXPECT_TRUE(errors[0]->status().IsNotFound());
}

// Re-open the table, and attempt to insert again. This time the meta cache
// should clear non-covered entries, and the insert should succeed.
ASSERT_OK(client_->OpenTable(kTableName, &table));
ASSERT_OK(session->Apply(BuildTestRow(table.get(), 1).release()));
}

TEST_F(ClientTest, TestExclusiveInclusiveRangeBounds) {
// Create test table and insert test rows.
const string table_name = "TestExclusiveInclusiveRangeBounds";
Expand Down
6 changes: 6 additions & 0 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ Status KuduClient::OpenTable(const string& table_name,
table->reset(new KuduTable(shared_from_this(),
table_name, table_id, num_replicas,
schema, partition_schema));

// When opening a table, clear the existing cached non-covered range entries.
// This avoids surprises where a new table instance won't be able to see the
// current range partitions of a table for up to the ttl.
data_->meta_cache_->ClearNonCoveredRangeEntries(table_id);

return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions src/kudu/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
/// This method does an RPC to ensure that the table exists and
/// looks up its schema.
///
/// @note New range partitions created by other clients will immediately be
/// available after opening the table.
///
/// @param [in] table_name
/// Name of the table.
/// @param [out] table
Expand Down
19 changes: 19 additions & 0 deletions src/kudu/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,25 @@ bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
return false;
}

void MetaCache::ClearNonCoveredRangeEntries(const std::string& table_id) {
VLOG(3) << "Clearing non-covered range entries of table " << table_id;
std::lock_guard<rw_spinlock> l(lock_);

TabletMap* tablets = FindOrNull(tablets_by_table_and_key_, table_id);
if (PREDICT_FALSE(!tablets)) {
// No cache available for this table.
return;
}

for (auto it = tablets->begin(); it != tablets->end();) {
if (it->second.is_non_covered_range()) {
it = tablets->erase(it);
} else {
it++;
}
}
}

void MetaCache::ClearCache() {
VLOG(3) << "Clearing cache";
std::lock_guard<rw_spinlock> l(lock_);
Expand Down
3 changes: 3 additions & 0 deletions src/kudu/client/meta_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);

// Clears the non-covered range entries from a table's meta cache.
void ClearNonCoveredRangeEntries(const std::string& table_id);

// Clears the meta cache.
void ClearCache();

Expand Down

0 comments on commit fdc022f

Please sign in to comment.