Skip to content

Commit

Permalink
Separate tablet MM ops into separate .cc file
Browse files Browse the repository at this point in the history
tablet.cc is a monster, no need for this in there.
Plus, these classes already have their own header file

Change-Id: Ie40475fdb9f5da6a7300661d5149509da990939d
Reviewed-on: http://gerrit.cloudera.org:8080/4362
Reviewed-by: Jean-Daniel Cryans <[email protected]>
Reviewed-by: Todd Lipcon <[email protected]>
Tested-by: Todd Lipcon <[email protected]>
  • Loading branch information
mpercy authored and toddlipcon committed Oct 5, 2016
1 parent 2a1a6c0 commit 2528edf
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 213 deletions.
1 change: 1 addition & 0 deletions src/kudu/tablet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(TABLET_SRCS
tablet.cc
tablet_bootstrap.cc
tablet_metrics.cc
tablet_mm_ops.cc
tablet_peer_mm_ops.cc
tablet_peer.cc
transactions/transaction.cc
Expand Down
213 changes: 0 additions & 213 deletions src/kudu/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -937,219 +937,6 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
return throttler_->Take(MonoTime::Now(), 1, bytes);
}

////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////

CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
: MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_rs_compacted_(0),
tablet_(tablet) {
}

void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);

// Any operation that changes the on-disk row layout invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
}
}

tablet_->UpdateCompactionStats(&prev_stats_);
*stats = prev_stats_;
}

bool CompactRowSetsOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another section of the tablet can
// be compacted concurrently.
//
// TODO: we should acquire the rowset compaction locks here. Otherwise, until
// Compact() acquires them, the maintenance manager may compute the same
// stats for this op and run it again, even though Perform() will end up
// performing a much less fruitful compaction. See KUDU-790 for more details.
prev_stats_.Clear();
return true;
}

void CompactRowSetsOp::Perform() {
WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
Substitute("Compaction failed on $0", tablet_->tablet_id()));
}

scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
return tablet_->metrics()->compact_rs_duration;
}

scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
return tablet_->metrics()->compact_rs_running;
}

////////////////////////////////////////////////////////////
// MinorDeltaCompactionOp
////////////////////////////////////////////////////////////

MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
tablet_(tablet) {
}

void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);

// Any operation that changes the number of REDO files invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
}
}

double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MINOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}

bool MinorDeltaCompactionOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}

void MinorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
}

scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_minor_compact_rs_duration;
}

scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_minor_compact_rs_running;
}

////////////////////////////////////////////////////////////
// MajorDeltaCompactionOp
////////////////////////////////////////////////////////////

MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
last_num_rs_major_delta_compacted_(0),
tablet_(tablet) {
}

void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);

// Any operation that changes the size of the on-disk data invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
int64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
int64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
}
}

double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MAJOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}

bool MajorDeltaCompactionOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}

void MajorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
}

scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_major_compact_rs_duration;
}

scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_major_compact_rs_running;
}

////////////////////////////////////////////////////////////
// Tablet
////////////////////////////////////////////////////////////

Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
CompactFlags flags) const {
CHECK_EQ(state_, kOpen);
Expand Down
Loading

0 comments on commit 2528edf

Please sign in to comment.