Skip to content

Commit

Permalink
[metrics] Hide metric live_row_count when tablet not support
Browse files Browse the repository at this point in the history
When upgrade an tserver from version older than 1.11, and add some
partitions for an existing table, then the table will hold some tablets
that support live row counting together with some ones that not support.
Tablet which not support live row counting has value of -1, in this
case, metrics merge on tserver and metrics aggregate on master have
problems.
This patch add feature to validate metric when happend to see an invalid
metric when MergeFrom, and disable 'live_row_count' metric of a table
when any tablet of a table not support live rows counting on master.

Change-Id: I2a54704d8cbd64a521e65aa3e95bf1a68f7757b7
Reviewed-on: http://gerrit.cloudera.org:8080/14507
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
(cherry picked from commit 67fbb7c)
Reviewed-on: http://gerrit.cloudera.org:8080/14534
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
acelyc111 authored and adembo committed Oct 23, 2019
1 parent 3d09bc3 commit b52bf50
Show file tree
Hide file tree
Showing 26 changed files with 270 additions and 111 deletions.
2 changes: 2 additions & 0 deletions src/kudu/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
NO_FATALS(GetLeaderMasterAndRun(live_row_count, [&] (
TableInfo* table_info, int64_t live_row_count) {
ASSERT_EVENTUALLY([&] () {
ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount());
ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value());
});
}));
Expand Down Expand Up @@ -838,6 +839,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
ASSERT_STR_NOT_CONTAINS(metric_attrs_str, kTableName);
ASSERT_STR_CONTAINS(metric_attrs_str, kNewTableName);
ASSERT_EQ(table->id(), table_info->metric_entity_->id());
ASSERT_TRUE(table_info->GetMetrics()->TableSupportsLiveRowCount());
ASSERT_EQ(live_row_count, table_info->GetMetrics()->live_row_count->value());
}));
}
Expand Down
56 changes: 39 additions & 17 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2934,14 +2934,16 @@ Status CatalogManager::GetTableStatistics(const GetTableStatisticsRequestPB* req
RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, authz_func, user,
&table, &l));

int64_t on_disk_size = table->GetMetrics()->on_disk_size->value();
int64_t live_row_count = table->GetMetrics()->live_row_count->value();
if (FLAGS_mock_table_metrics_for_testing) {
on_disk_size = FLAGS_on_disk_size_for_testing;
live_row_count = FLAGS_live_row_count_for_testing;
}
resp->set_on_disk_size(on_disk_size);
resp->set_live_row_count(live_row_count);
if (PREDICT_FALSE(FLAGS_mock_table_metrics_for_testing)) {
resp->set_on_disk_size(FLAGS_on_disk_size_for_testing);
resp->set_live_row_count(FLAGS_live_row_count_for_testing);
} else {
resp->set_on_disk_size(table->GetMetrics()->on_disk_size->value());
if (table->GetMetrics()->TableSupportsLiveRowCount()) {
resp->set_live_row_count(table->GetMetrics()->live_row_count->value());
}
}

return Status::OK();
}

Expand Down Expand Up @@ -4243,11 +4245,11 @@ Status CatalogManager::ProcessTabletReport(

// 10. Process the report's tablet statistics.
if (report.has_stats() && report.has_consensus_state()) {
DCHECK(ts_desc->permanent_uuid() == report.consensus_state().leader_uuid());
DCHECK_EQ(ts_desc->permanent_uuid(), report.consensus_state().leader_uuid());

// Now the tserver only reports the LEADER replicas its own.
// First, update table's stats.
tablet->table()->UpdateMetrics(tablet->GetStats(), report.stats());
tablet->table()->UpdateMetrics(tablet_id, tablet->GetStats(), report.stats());
// Then, update tablet's stats.
tablet->UpdateStats(report.stats());
}
Expand Down Expand Up @@ -5417,8 +5419,8 @@ void TableInfo::AddRemoveTablets(const vector<scoped_refptr<TabletInfo>>& tablet
const auto& lower_bound = tablet->metadata().state().pb.partition().partition_key_start();
CHECK(EraseKeyReturnValuePtr(&tablet_map_, lower_bound) != nullptr);
DecrementSchemaVersionCountUnlocked(tablet->reported_schema_version());
// Update the table metrics for the deleted tablets.
UpdateMetrics(tablet->GetStats(), ReportedTabletStatsPB());
// Remove the table metrics for the deleted tablets.
RemoveMetrics(tablet->id(), tablet->GetStats());
}
for (const auto& tablet : tablets_to_add) {
TabletInfo* old = nullptr;
Expand Down Expand Up @@ -5569,15 +5571,35 @@ void TableInfo::UnregisterMetrics() {
}
}

void TableInfo::UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
void TableInfo::UpdateMetrics(const string& tablet_id,
const tablet::ReportedTabletStatsPB& old_stats,
const tablet::ReportedTabletStatsPB& new_stats) {
if (metrics_) {
metrics_->on_disk_size->IncrementBy(new_stats.on_disk_size() - old_stats.on_disk_size());
if (new_stats.live_row_count() >= 0) {
metrics_->on_disk_size->IncrementBy(
static_cast<int64_t>(new_stats.on_disk_size())
- static_cast<int64_t>(old_stats.on_disk_size()));
if (new_stats.has_live_row_count()) {
DCHECK(!metrics_->ContainsTabletNoLiveRowCount(tablet_id));
metrics_->live_row_count->IncrementBy(
new_stats.live_row_count() - old_stats.live_row_count());
static_cast<int64_t>(new_stats.live_row_count())
- static_cast<int64_t>(old_stats.live_row_count()));
} else if (!old_stats.has_on_disk_size()) {
// The new reported stat doesn't support 'live_row_count' and
// this is the first report stat of the tablet.
metrics_->AddTabletNoLiveRowCount(tablet_id);
}
}
}

void TableInfo::RemoveMetrics(const string& tablet_id,
const tablet::ReportedTabletStatsPB& old_stats) {
if (metrics_) {
metrics_->on_disk_size->IncrementBy(-static_cast<int64_t>(old_stats.on_disk_size()));
if (old_stats.has_live_row_count()) {
DCHECK(!metrics_->ContainsTabletNoLiveRowCount(tablet_id));
metrics_->live_row_count->IncrementBy(-static_cast<int64_t>(old_stats.live_row_count()));
} else {
metrics_->live_row_count->set_value(-1);
metrics_->DeleteTabletNoLiveRowCount(tablet_id);
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/kudu/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,15 @@ class TableInfo : public RefCountedThreadSafe<TableInfo> {
// Unregister metrics for the table.
void UnregisterMetrics();

// Update the metrics.
void UpdateMetrics(const tablet::ReportedTabletStatsPB& old_stats,
// Update stats belonging to 'tablet_id' in the table's metrics.
void UpdateMetrics(const std::string& tablet_id,
const tablet::ReportedTabletStatsPB& old_stats,
const tablet::ReportedTabletStatsPB& new_stats);

// Remove stats belonging to 'tablet_id' from table metrics.
void RemoveMetrics(const std::string& tablet_id,
const tablet::ReportedTabletStatsPB& old_stats);

// Update the attributes of the metrics.
void UpdateMetricsAttrs(const std::string& new_table_name);

Expand Down
6 changes: 2 additions & 4 deletions src/kudu/master/master_path_handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
Expand Down Expand Up @@ -494,9 +493,8 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
// But the value of disk size will never be negative.
(*output)["table_disk_size"] =
HumanReadableNumBytes::ToString(table_metrics->on_disk_size->value());
int64 live_row_count = table_metrics->live_row_count->value();
if (live_row_count >= 0) {
(*output)["table_live_row_count"] = live_row_count;
if (table_metrics->TableSupportsLiveRowCount()) {
(*output)["table_live_row_count"] = table_metrics->live_row_count->value();
} else {
(*output)["table_live_row_count"] = "N/A";
}
Expand Down
29 changes: 26 additions & 3 deletions src/kudu/master/table_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
// under the License.
#include "kudu/master/table_metrics.h"

#include <mutex>

#include "kudu/gutil/map-util.h"

namespace kudu {
namespace master {

// Table-specific stats.
METRIC_DEFINE_gauge_int64(table, on_disk_size, "Table Size On Disk",
METRIC_DEFINE_gauge_uint64(table, on_disk_size, "Table Size On Disk",
kudu::MetricUnit::kBytes,
"Pre-replication aggregated disk space used by all tablets in this table, "
"including metadata.");
METRIC_DEFINE_gauge_int64(table, live_row_count, "Table Live Row count",
METRIC_DEFINE_gauge_uint64(table, live_row_count, "Table Live Row count",
kudu::MetricUnit::kRows,
"Pre-replication aggregated number of live rows in this table. "
"When the table doesn't support live row counting, -1 will be returned.");
"Only accurate if all tablets in the table support live row counting.");

#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))

Expand All @@ -38,5 +41,25 @@ TableMetrics::TableMetrics(const scoped_refptr<MetricEntity>& entity)
}
#undef GINIT

void TableMetrics::AddTabletNoLiveRowCount(const std::string& tablet_id) {
std::lock_guard<simple_spinlock> l(lock_);
InsertIfNotPresent(&tablet_ids_no_live_row_count_, tablet_id);
}

void TableMetrics::DeleteTabletNoLiveRowCount(const std::string& tablet_id) {
std::lock_guard<simple_spinlock> l(lock_);
tablet_ids_no_live_row_count_.erase(tablet_id);
}

bool TableMetrics::ContainsTabletNoLiveRowCount(const std::string& tablet_id) const {
std::lock_guard<simple_spinlock> l(lock_);
return ContainsKey(tablet_ids_no_live_row_count_, tablet_id);
}

bool TableMetrics::TableSupportsLiveRowCount() const {
std::lock_guard<simple_spinlock> l(lock_);
return tablet_ids_no_live_row_count_.empty();
}

} // namespace master
} // namespace kudu
17 changes: 15 additions & 2 deletions src/kudu/master/table_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
#define KUDU_MASTER_TABLE_METRICS_H

#include <cstdint>
#include <string>
#include <unordered_set>

#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"

namespace kudu {
Expand All @@ -42,8 +45,18 @@ namespace master {
struct TableMetrics {
explicit TableMetrics(const scoped_refptr<MetricEntity>& entity);

scoped_refptr<AtomicGauge<int64_t>> on_disk_size;
scoped_refptr<AtomicGauge<int64_t>> live_row_count;
scoped_refptr<AtomicGauge<uint64_t>> on_disk_size;
scoped_refptr<AtomicGauge<uint64_t>> live_row_count;

void AddTabletNoLiveRowCount(const std::string& tablet_id);
void DeleteTabletNoLiveRowCount(const std::string& tablet_id);
bool ContainsTabletNoLiveRowCount(const std::string& tablet_id) const;
bool TableSupportsLiveRowCount() const;

private:
mutable simple_spinlock lock_;
// IDs of tablets which do not support reporting live row count.
std::unordered_set<std::string> tablet_ids_no_live_row_count_;
};

} // namespace master
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/compaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfMemRowSetFlush) {
NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
NO_FATALS(DeleteRows(mrs.get(), 50));
NO_FATALS(InsertRows(mrs.get(), 10, 0));
int64_t count = 0;
uint64_t count = 0;
ASSERT_OK(mrs->CountLiveRows(&count));
ASSERT_EQ(100 - 50 + 10, count);

Expand Down Expand Up @@ -1250,7 +1250,7 @@ TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
std::random_shuffle(all_rss.begin(), all_rss.end());
NO_FATALS(CompactAndReopenNoRoll(all_rss, schema_, &result));

int64_t count = 0;
uint64_t count = 0;
ASSERT_OK(result->CountLiveRows(&count));
ASSERT_EQ((100 - 50 + 10) * 3, count);
}
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/diskrowset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,9 @@ Status DiskRowSet::CountRows(const IOContext* io_context, rowid_t *count) const
return Status::OK();
}

Status DiskRowSet::CountLiveRows(int64_t* count) const {
Status DiskRowSet::CountLiveRows(uint64_t* count) const {
DCHECK_GE(rowset_metadata_->live_row_count(), delta_tracker_->CountDeletedRows());
*count = rowset_metadata_->live_row_count() - delta_tracker_->CountDeletedRows();
DCHECK_GE(*count, 0);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion src/kudu/tablet/diskrowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class DiskRowSet : public RowSet {
Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override;

// Count the number of live rows in this DRS.
virtual Status CountLiveRows(int64_t* count) const override;
virtual Status CountLiveRows(uint64_t* count) const override;

// See RowSet::GetBounds(...)
virtual Status GetBounds(std::string* min_encoded_key,
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/tablet/memrowset-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ TEST_F(TestMemRowSet, TestCountLiveRows) {
MemTracker::GetRootTracker(), &mrs));

const auto CheckLiveRowsCount = [&](int64_t expect) {
int64_t count = 0;
uint64_t count = 0;
ASSERT_OK(mrs->CountLiveRows(&count));
ASSERT_EQ(expect, count);
};
Expand Down
5 changes: 2 additions & 3 deletions src/kudu/tablet/memrowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,8 @@ class MemRowSet : public RowSet,
return Status::OK();
}

virtual Status CountLiveRows(int64_t* count) const override {
virtual Status CountLiveRows(uint64_t* count) const override {
*count = live_row_count_.Load();
DCHECK_GE(*count, 0);
return Status::OK();
}

Expand Down Expand Up @@ -459,7 +458,7 @@ class MemRowSet : public RowSet,
std::atomic<bool> has_been_compacted_;

// Number of live rows in this MRS.
AtomicInt<int64_t> live_row_count_;
AtomicInt<uint64_t> live_row_count_;

DISALLOW_COPY_AND_ASSIGN(MemRowSet);
};
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,6 @@ enum TabletStatePB {

// Statistics for a tablet replica.
message ReportedTabletStatsPB {
required int64 on_disk_size = 1;
required int64 live_row_count = 2;
optional uint64 on_disk_size = 1;
optional uint64 live_row_count = 2;
}
2 changes: 1 addition & 1 deletion src/kudu/tablet/mock-rowsets.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MockRowSet : public RowSet {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
virtual Status CountLiveRows(int64_t* /*count*/) const OVERRIDE {
virtual Status CountLiveRows(uint64_t* /*count*/) const OVERRIDE {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/tablet/mt-tablet-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
while (running_insert_count_.count() > 0) {
num_rowsets_ts->SetValue(tablet()->num_rowsets());
memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0);
int64_t num_live_rows;
uint64_t num_live_rows;
ignore_result(tablet()->CountLiveRows(&num_live_rows));
num_live_rows_ts->SetValue(num_live_rows);

Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/rowset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count)
return Status::OK();
}

Status DuplicatingRowSet::CountLiveRows(int64_t* count) const {
Status DuplicatingRowSet::CountLiveRows(uint64_t* count) const {
for (const shared_ptr<RowSet>& rs : old_rowsets_) {
int64_t tmp = 0;
uint64_t tmp = 0;
RETURN_NOT_OK(rs->CountLiveRows(&tmp));
*count += tmp;
}
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class RowSet {
virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) const = 0;

// Count the number of live rows in this rowset.
virtual Status CountLiveRows(int64_t* count) const = 0;
virtual Status CountLiveRows(uint64_t* count) const = 0;

// Return the bounds for this RowSet. 'min_encoded_key' and 'max_encoded_key'
// are set to the first and last encoded keys for this RowSet.
Expand Down Expand Up @@ -409,7 +409,7 @@ class DuplicatingRowSet : public RowSet {

Status CountRows(const fs::IOContext* io_context, rowid_t *count) const OVERRIDE;

virtual Status CountLiveRows(int64_t* count) const OVERRIDE;
virtual Status CountLiveRows(uint64_t* count) const OVERRIDE;

virtual Status GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const OVERRIDE;
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/tablet/tablet-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TestTablet : public TabletTestBase<SETUP> {
}

void CheckLiveRowsCount(int64_t expect) {
int64_t count = 0;
uint64_t count = 0;
ASSERT_OK(this->tablet()->CountLiveRows(&count));
ASSERT_EQ(expect, count);
}
Expand Down Expand Up @@ -915,7 +915,7 @@ TYPED_TEST(TestTablet, TestCountLiveRowsAfterShutdown) {
NO_FATALS(this->tablet()->Shutdown());

// Call the CountLiveRows().
int64_t count = 0;
uint64_t count = 0;
ASSERT_TRUE(tablet->CountLiveRows(&count).IsRuntimeError());
}

Expand Down
6 changes: 3 additions & 3 deletions src/kudu/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,7 @@ Status Tablet::CountRows(uint64_t *count) const {
return Status::OK();
}

Status Tablet::CountLiveRows(int64_t* count) const {
Status Tablet::CountLiveRows(uint64_t* count) const {
if (!metadata_->supports_live_row_count()) {
return Status::NotSupported("This tablet doesn't support live row counting");
}
Expand All @@ -1892,8 +1892,8 @@ Status Tablet::CountLiveRows(int64_t* count) const {
return Status::RuntimeError("The tablet has been shut down");
}

int64_t ret = 0;
int64_t tmp = 0;
uint64_t ret = 0;
uint64_t tmp = 0;
RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret));
for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
RETURN_NOT_OK(rowset->CountLiveRows(&tmp));
Expand Down
Loading

0 comments on commit b52bf50

Please sign in to comment.