Skip to content

Commit

Permalink
KUDU-3344: catalog manager clean up metadata for deleted tables/tablets
Browse files Browse the repository at this point in the history
Kudu masters now retain metadata for deleted tables and tablets forever, and
the leader master loads all of them into memory when starts. If we have a lot
of tables and tablets in a cluster, memory usage of the leader master will be
large and it will take a long time to start the leader master. Consider that
in many cases users drop tables and partitions, useless metadata should be
cleaned up in backgroud tasks.

But it's hard to decide when we should clean them up, because the deletion of
tablet replicas is asynchronous. If metadata is deleted before the tablet data
is deleted, the unknown tablet reported by a tablet server will not be processed
by catalog manager and we must delete it manually. So this patch add a new flag
'deleted_table_and_tablet_reserved_secs', its default value is the same as
'unresponsive_ts_rpc_timeout_ms', we can roughly assume that after this amount
of time the tablet data will be actually deleted and it's safe to delete its
metadata entry from sys.catalog table and in-memory map.

Change-Id: Idefa2ee2f5108ba913fe0057a4061c3c28351547
Reviewed-on: http://gerrit.cloudera.org:8080/18152
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
zhangyifan27 authored and alexeyserbin committed Feb 21, 2022
1 parent 7db93eb commit 2de290d
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers)
}

Status KuduClient::ListTables(vector<string>* tables, const string& filter) {
tables->clear();
vector<Data::TableInfo> tables_info;
RETURN_NOT_OK(data_->ListTablesWithInfo(this, &tables_info, filter));
for (auto& info : tables_info) {
Expand Down
26 changes: 19 additions & 7 deletions src/kudu/integration-tests/master-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ostream>
#include <string>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -116,7 +117,7 @@ static const MonoDelta kTransientStateBackoff = MonoDelta::FromMilliseconds(50);

// Parameterized based on HmsMode.
class MasterStressTest : public ExternalMiniClusterITestBase,
public ::testing::WithParamInterface<HmsMode> {
public ::testing::WithParamInterface<std::tuple<HmsMode, bool>> {
public:
MasterStressTest()
: done_(1),
Expand Down Expand Up @@ -146,10 +147,19 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
opts.start_process_timeout = MonoDelta::FromSeconds(60);
opts.rpc_negotiation_timeout = MonoDelta::FromSeconds(30);

opts.hms_mode = GetParam();
opts.hms_mode = std::get<0>(GetParam());
// Tune down the notification log poll period in order to speed up catalog convergence.
opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");

if (std::get<1>(GetParam())) {
// Set shorter intervals to trigger frequent cleanup tasks.
opts.extra_master_flags.emplace_back(
"--enable_metadata_cleanup_for_deleted_tables_and_tablets=true");
opts.extra_master_flags.emplace_back("--catalog_manager_bg_task_wait_ms=10");
opts.extra_master_flags.emplace_back(
"--metadata_for_deleted_table_and_tablet_reserved_secs=0");
}

// Set max missed heartbeats periods to 1.0 (down from 3.0).
opts.extra_master_flags.emplace_back("--leader_failure_max_missed_heartbeat_periods=1.0");

Expand Down Expand Up @@ -205,7 +215,7 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
new MasterServiceProxy(cluster_->messenger(), addr, addr.host()));
ASSERT_OK(CreateTabletServerMap(m_proxy, cluster_->messenger(), &ts_map_));

if (GetParam() == HmsMode::ENABLE_METASTORE_INTEGRATION) {
if (std::get<0>(GetParam()) == HmsMode::ENABLE_METASTORE_INTEGRATION) {
thrift::ClientOptions hms_opts;
hms_opts.service_principal = "hive";
hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
Expand Down Expand Up @@ -525,10 +535,12 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
std::unordered_map<string, itest::TServerDetails*> ts_map_;
};

// Run the test with the HMS integration enabled and disabled.
INSTANTIATE_TEST_SUITE_P(HmsConfigurations, MasterStressTest, ::testing::ValuesIn(
vector<HmsMode> { HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION }
));
INSTANTIATE_TEST_SUITE_P(
CatalogManagerConfigurations,
MasterStressTest,
::testing::Combine(/*hms_mode*/ ::testing::ValuesIn(vector<HmsMode>{
HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION}),
/*enable_metadata_cleanup_for_deleted_table(t)s*/ ::testing::Bool()));

TEST_P(MasterStressTest, Test) {
OverrideFlagForSlowTests("num_create_table_threads", "10");
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
ASSERT_OK(l.first_failed_status());
// Get the TableInfo.
vector<scoped_refptr<TableInfo>> table_infos;
ASSERT_OK(catalog->GetAllTables(&table_infos));
catalog->GetAllTables(&table_infos);
ASSERT_EQ(1, table_infos.size());
// Run the check function.
NO_FATALS(check_function(table_infos[0].get(), live_row_count));
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/master/auto_rebalancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ Status AutoRebalancerTask::BuildClusterRawInfo(
{
CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
RETURN_NOT_OK(leader_lock.first_failed_status());
RETURN_NOT_OK(catalog_manager_->GetAllTables(&table_infos));
catalog_manager_->GetAllTables(&table_infos);
}

table_summaries.reserve(table_infos.size());
Expand Down
163 changes: 152 additions & 11 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,25 @@ DEFINE_double(table_write_limit_ratio, 0.95,
"Set the ratio of how much write limit can be reached");
TAG_FLAG(table_write_limit_ratio, experimental);

DEFINE_bool(enable_metadata_cleanup_for_deleted_tables_and_tablets, false,
"Whether to clean up metadata for deleted tables and tablets from master's "
"in-memory map and the 'sys.catalog' table.");
TAG_FLAG(enable_metadata_cleanup_for_deleted_tables_and_tablets, experimental);
TAG_FLAG(enable_metadata_cleanup_for_deleted_tables_and_tablets, runtime);

DEFINE_int32(metadata_for_deleted_table_and_tablet_reserved_secs, 60 * 60,
"After this amount of time, the metadata of a deleted table/tablet will be "
"cleaned up if --enable_metadata_cleanup_for_deleted_tables_and_tablets=true.");
TAG_FLAG(metadata_for_deleted_table_and_tablet_reserved_secs, experimental);
TAG_FLAG(metadata_for_deleted_table_and_tablet_reserved_secs, runtime);

DEFINE_bool(enable_chunked_tablet_writes, true,
"Whether to split tablet actions into chunks when persisting them in sys.catalog "
"table. If disabled, any update of the sys.catalog table will be rejected if exceeds "
"--rpc_max_message_size.");
TAG_FLAG(enable_chunked_tablet_writes, experimental);
TAG_FLAG(enable_chunked_tablet_writes, runtime);

DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int64(tsk_rotation_seconds);
DECLARE_string(ranger_config_path);
Expand Down Expand Up @@ -778,6 +797,26 @@ void CatalogManagerBgTasks::Run() {
}
}

if (FLAGS_enable_metadata_cleanup_for_deleted_tables_and_tablets) {
vector<scoped_refptr<TableInfo>> deleted_tables;
vector<scoped_refptr<TabletInfo>> deleted_tablets;
catalog_manager_->ExtractDeletedTablesAndTablets(&deleted_tables, &deleted_tablets);
Status s = Status::OK();
// Clean up metadata for deleted tablets first and then clean up metadata for deleted
// tables. This is the reverse of the order in which we load them. So for any remaining
// tablet, the metadata of the table to which it belongs must exist.
const time_t now = time(nullptr);
if (!deleted_tablets.empty()) {
s = catalog_manager_->ProcessDeletedTablets(deleted_tablets, now);
}
if (s.ok() && !deleted_tables.empty()) {
s = catalog_manager_->ProcessDeletedTables(deleted_tables, now);
}
if (!s.ok()) {
LOG(ERROR) << "Error processing tables/tablets deletions: " << s.ToString();
}
}

// If this is the leader master, check if it's time to generate
// and store a new TSK (Token Signing Key).
Status s = catalog_manager_->TryGenerateNewTskUnlocked();
Expand Down Expand Up @@ -2393,8 +2432,10 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
}

TRACE("Modifying in-memory table state");
string deletion_msg = "Table deleted at " + LocalTimeAsString();
const time_t timestamp = time(nullptr);
string deletion_msg = "Table deleted at " + TimestampAsString(timestamp);
l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
l.mutable_data()->pb.set_delete_timestamp(timestamp);

// 2. Look up the tablets, lock them, and mark them as deleted.
{
Expand All @@ -2408,6 +2449,7 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
for (const auto& t : tablets) {
t->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
t->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}

// 3. Update sys-catalog with the removed table and tablet state.
Expand Down Expand Up @@ -3207,7 +3249,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
LocalTimeAsString()));
}

const string deletion_msg = "Partition dropped at " + LocalTimeAsString();
const time_t timestamp = time(nullptr);
const string deletion_msg = "Partition dropped at " + TimestampAsString(timestamp);
TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);

Expand All @@ -3229,6 +3272,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}
actions.tablets_to_update = tablets_to_drop;

Expand Down Expand Up @@ -3472,7 +3516,9 @@ Status CatalogManager::ListTables(const ListTablesRequestPB* req,
}
}
InsertOrUpdate(&table_info_by_name, table_name, table_info);
EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
if (user) {
EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
}
}

MAYBE_INJECT_FIXED_LATENCY(FLAGS_catalog_manager_inject_latency_list_authz_ms);
Expand Down Expand Up @@ -3626,14 +3672,20 @@ Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableI
return Status::OK();
}

Status CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
void CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
leader_lock_.AssertAcquiredForReading();

tables->clear();
shared_lock<LockType> l(lock_);
AppendValuesFromMap(table_ids_map_, tables);
}

return Status::OK();
void CatalogManager::GetAllTabletsForTests(vector<scoped_refptr<TabletInfo>>* tablets) {
leader_lock_.AssertAcquiredForReading();

tablets->clear();
shared_lock<LockType> l(lock_);
AppendValuesFromMap(tablet_map_, tablets);
}

Status CatalogManager::TableNameExists(const string& table_name, bool* exists) {
Expand Down Expand Up @@ -4646,9 +4698,9 @@ Status CatalogManager::ProcessTabletReport(
// It'd be unsafe to ask the tserver to delete this tablet without first
// replicating something to our followers (i.e. to guarantee that we're
// the leader). For example, if we were a rogue master, we might be
// deleting a tablet created by a new master accidentally. But masters
// retain metadata for deleted tablets forever, so a tablet can only be
// truly unknown in the event of a serious misconfiguration, such as a
// deleting a tablet created by a new master accidentally. Though masters
// don't always retain metadata for deleted tablets forever, a tablet
// may be unknown in the event of a serious misconfiguration, such as a
// tserver heartbeating to the wrong cluster. Therefore, it should be
// reasonable to ignore it and wait for an operator fix the situation.
LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
Expand Down Expand Up @@ -5140,6 +5192,27 @@ void CatalogManager::ExtractTabletsToProcess(
}
}

void CatalogManager::ExtractDeletedTablesAndTablets(
vector<scoped_refptr<TableInfo>>* deleted_tables,
vector<scoped_refptr<TabletInfo>>* deleted_tablets) {
shared_lock<LockType> l(lock_);
for (const auto& table_entry : table_ids_map_) {
scoped_refptr<TableInfo> table = table_entry.second;
TableMetadataLock table_lock(table.get(), LockMode::READ);
if (table_lock.data().is_deleted()) {
deleted_tables->emplace_back(table);
}
}
for (const auto& tablet_entry : tablet_map_) {
scoped_refptr<TabletInfo> tablet = tablet_entry.second;
TableMetadataLock table_lock(tablet->table().get(), LockMode::READ);
TabletMetadataLock tablet_lock(tablet.get(), LockMode::READ);
if (tablet_lock.data().is_deleted() || table_lock.data().is_deleted()) {
deleted_tablets->emplace_back(tablet);
}
}
}

// Check if it's time to roll TokenSigner's key. There's a bit of subtlety here:
// we shouldn't start exporting a key until it is properly persisted.
// So, the protocol is:
Expand Down Expand Up @@ -5256,10 +5329,11 @@ void CatalogManager::HandleAssignCreatingTablet(const scoped_refptr<TabletInfo>&
tablet->ToString(), replacement->id());

// Mark old tablet as replaced.
const time_t timestamp = time(nullptr);
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::REPLACED,
Substitute("Replaced by $0 at $1",
replacement->id(), LocalTimeAsString()));
SysTabletsEntryPB::REPLACED,
Substitute("Replaced by $0 at $1", replacement->id(), TimestampAsString(timestamp)));
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);

// Mark new tablet as being created.
replacement->mutable_metadata()->mutable_dirty()->set_state(
Expand Down Expand Up @@ -5503,6 +5577,72 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
}
}

Status CatalogManager::ProcessDeletedTablets(const vector<scoped_refptr<TabletInfo>>& tablets,
time_t current_timestamp) {
TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);
tablets_lock.AddMutableInfos(tablets);
tablets_lock.Lock(LockMode::WRITE);

vector<scoped_refptr<TabletInfo>> tablets_to_clean_up;
for (const auto& tablet : tablets) {
if (current_timestamp - tablet->metadata().state().pb.delete_timestamp() >
FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs) {
tablets_to_clean_up.emplace_back(tablet);
}
}
// Persist the changes to the sys.catalog table.
SysCatalogTable::Actions actions;
actions.tablets_to_delete = tablets_to_clean_up;
const auto write_mode = FLAGS_enable_chunked_tablet_writes ? SysCatalogTable::WriteMode::CHUNKED
: SysCatalogTable::WriteMode::ATOMIC;
Status s = sys_catalog_->Write(std::move(actions), write_mode);
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
LOG(WARNING) << s.ToString();
return s;
}
// Remove expired tablets from the global map.
{
std::lock_guard<LockType> l(lock_);
for (const auto& t : tablets_to_clean_up) {
DCHECK(ContainsKey(tablet_map_, t->id()));
tablet_map_.erase(t->id());
VLOG(1) << "Cleaned up deleted tablet: " << t->id();
}
}
tablets_lock.Unlock();
return Status::OK();
}

Status CatalogManager::ProcessDeletedTables(const vector<scoped_refptr<TableInfo>>& tables,
time_t current_timestamp) {
TableMetadataGroupLock tables_lock(LockMode::RELEASED);
tables_lock.AddMutableInfos(tables);
tables_lock.Lock(LockMode::WRITE);

for (const auto& table : tables) {
if (current_timestamp - table->metadata().state().pb.delete_timestamp() >
FLAGS_metadata_for_deleted_table_and_tablet_reserved_secs) {
SysCatalogTable::Actions action;
action.table_to_delete = table;
Status s = sys_catalog_->Write(std::move(action));
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
LOG(WARNING) << s.ToString();
return s;
}

std::lock_guard<LockType> l(lock_);
DCHECK(ContainsKey(table_ids_map_, table->id()));
table_ids_map_.erase(table->id());
VLOG(1) << "Cleaned up deleted table: " << table->ToString();
}
}

tables_lock.Unlock();
return Status::OK();
}

Status CatalogManager::BuildLocationsForTablet(
const scoped_refptr<TabletInfo>& tablet,
ReplicaTypeFilter filter,
Expand Down Expand Up @@ -5663,6 +5803,7 @@ Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletRespo
const string replace_msg = Substitute("replaced by tablet $0", new_tablet->id());
old_tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
replace_msg);
old_tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(time(nullptr));

// Persist the changes to the syscatalog table.
{
Expand Down
Loading

0 comments on commit 2de290d

Please sign in to comment.