diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt index 8fda82c341..9c8bc13d25 100644 --- a/src/kudu/tablet/CMakeLists.txt +++ b/src/kudu/tablet/CMakeLists.txt @@ -19,6 +19,7 @@ set(TABLET_SRCS tablet.cc tablet_bootstrap.cc tablet_metrics.cc + tablet_mm_ops.cc tablet_peer_mm_ops.cc tablet_peer.cc transactions/transaction.cc diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index 8b7e0e468c..a0c3f3af24 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -937,219 +937,6 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) { return throttler_->Take(MonoTime::Now(), 1, bytes); } -//////////////////////////////////////////////////////////// -// CompactRowSetsOp -//////////////////////////////////////////////////////////// - -CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet) - : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()), - MaintenanceOp::HIGH_IO_USAGE), - last_num_mrs_flushed_(0), - last_num_rs_compacted_(0), - tablet_(tablet) { -} - -void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) { - std::lock_guard l(lock_); - - // Any operation that changes the on-disk row layout invalidates the - // cached stats. - TabletMetrics* metrics = tablet_->metrics(); - if (metrics) { - uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); - uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); - if (prev_stats_.valid() && - new_num_mrs_flushed == last_num_mrs_flushed_ && - new_num_rs_compacted == last_num_rs_compacted_) { - *stats = prev_stats_; - return; - } else { - last_num_mrs_flushed_ = new_num_mrs_flushed; - last_num_rs_compacted_ = new_num_rs_compacted; - } - } - - tablet_->UpdateCompactionStats(&prev_stats_); - *stats = prev_stats_; -} - -bool CompactRowSetsOp::Prepare() { - std::lock_guard l(lock_); - // Invalidate the cached stats so that another section of the tablet can - // be compacted concurrently. - // - // TODO: we should acquire the rowset compaction locks here. Otherwise, until - // Compact() acquires them, the maintenance manager may compute the same - // stats for this op and run it again, even though Perform() will end up - // performing a much less fruitful compaction. See KUDU-790 for more details. - prev_stats_.Clear(); - return true; -} - -void CompactRowSetsOp::Perform() { - WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS), - Substitute("Compaction failed on $0", tablet_->tablet_id())); -} - -scoped_refptr CompactRowSetsOp::DurationHistogram() const { - return tablet_->metrics()->compact_rs_duration; -} - -scoped_refptr > CompactRowSetsOp::RunningGauge() const { - return tablet_->metrics()->compact_rs_running; -} - -//////////////////////////////////////////////////////////// -// MinorDeltaCompactionOp -//////////////////////////////////////////////////////////// - -MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet) - : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()), - MaintenanceOp::HIGH_IO_USAGE), - last_num_mrs_flushed_(0), - last_num_dms_flushed_(0), - last_num_rs_compacted_(0), - last_num_rs_minor_delta_compacted_(0), - tablet_(tablet) { -} - -void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { - std::lock_guard l(lock_); - - // Any operation that changes the number of REDO files invalidates the - // cached stats. - TabletMetrics* metrics = tablet_->metrics(); - if (metrics) { - uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); - uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); - uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); - uint64_t new_num_rs_minor_delta_compacted = - metrics->delta_minor_compact_rs_duration->TotalCount(); - if (prev_stats_.valid() && - new_num_mrs_flushed == last_num_mrs_flushed_ && - new_num_dms_flushed == last_num_dms_flushed_ && - new_num_rs_compacted == last_num_rs_compacted_ && - new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) { - *stats = prev_stats_; - return; - } else { - last_num_mrs_flushed_ = new_num_mrs_flushed; - last_num_dms_flushed_ = new_num_dms_flushed; - last_num_rs_compacted_ = new_num_rs_compacted; - last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted; - } - } - - double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact( - RowSet::MINOR_DELTA_COMPACTION, nullptr); - prev_stats_.set_perf_improvement(perf_improv); - prev_stats_.set_runnable(perf_improv > 0); - *stats = prev_stats_; -} - -bool MinorDeltaCompactionOp::Prepare() { - std::lock_guard l(lock_); - // Invalidate the cached stats so that another rowset in the tablet can - // be delta compacted concurrently. - // - // TODO: See CompactRowSetsOp::Prepare(). - prev_stats_.Clear(); - return true; -} - -void MinorDeltaCompactionOp::Perform() { - WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION), - Substitute("Minor delta compaction failed on $0", tablet_->tablet_id())); -} - -scoped_refptr MinorDeltaCompactionOp::DurationHistogram() const { - return tablet_->metrics()->delta_minor_compact_rs_duration; -} - -scoped_refptr > MinorDeltaCompactionOp::RunningGauge() const { - return tablet_->metrics()->delta_minor_compact_rs_running; -} - -//////////////////////////////////////////////////////////// -// MajorDeltaCompactionOp -//////////////////////////////////////////////////////////// - -MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet) - : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()), - MaintenanceOp::HIGH_IO_USAGE), - last_num_mrs_flushed_(0), - last_num_dms_flushed_(0), - last_num_rs_compacted_(0), - last_num_rs_minor_delta_compacted_(0), - last_num_rs_major_delta_compacted_(0), - tablet_(tablet) { -} - -void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { - std::lock_guard l(lock_); - - // Any operation that changes the size of the on-disk data invalidates the - // cached stats. - TabletMetrics* metrics = tablet_->metrics(); - if (metrics) { - int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); - int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); - int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); - int64_t new_num_rs_minor_delta_compacted = - metrics->delta_minor_compact_rs_duration->TotalCount(); - int64_t new_num_rs_major_delta_compacted = - metrics->delta_major_compact_rs_duration->TotalCount(); - if (prev_stats_.valid() && - new_num_mrs_flushed == last_num_mrs_flushed_ && - new_num_dms_flushed == last_num_dms_flushed_ && - new_num_rs_compacted == last_num_rs_compacted_ && - new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ && - new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) { - *stats = prev_stats_; - return; - } else { - last_num_mrs_flushed_ = new_num_mrs_flushed; - last_num_dms_flushed_ = new_num_dms_flushed; - last_num_rs_compacted_ = new_num_rs_compacted; - last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted; - last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted; - } - } - - double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact( - RowSet::MAJOR_DELTA_COMPACTION, nullptr); - prev_stats_.set_perf_improvement(perf_improv); - prev_stats_.set_runnable(perf_improv > 0); - *stats = prev_stats_; -} - -bool MajorDeltaCompactionOp::Prepare() { - std::lock_guard l(lock_); - // Invalidate the cached stats so that another rowset in the tablet can - // be delta compacted concurrently. - // - // TODO: See CompactRowSetsOp::Prepare(). - prev_stats_.Clear(); - return true; -} - -void MajorDeltaCompactionOp::Perform() { - WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION), - Substitute("Major delta compaction failed on $0", tablet_->tablet_id())); -} - -scoped_refptr MajorDeltaCompactionOp::DurationHistogram() const { - return tablet_->metrics()->delta_major_compact_rs_duration; -} - -scoped_refptr > MajorDeltaCompactionOp::RunningGauge() const { - return tablet_->metrics()->delta_major_compact_rs_running; -} - -//////////////////////////////////////////////////////////// -// Tablet -//////////////////////////////////////////////////////////// - Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked, CompactFlags flags) const { CHECK_EQ(state_, kOpen); diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc new file mode 100644 index 0000000000..345e8987e7 --- /dev/null +++ b/src/kudu/tablet/tablet_mm_ops.cc @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/tablet/tablet_mm_ops.h" + +#include + +#include "kudu/util/locks.h" +#include "kudu/tablet/tablet.h" +#include "kudu/tablet/tablet_metrics.h" + +using strings::Substitute; + +namespace kudu { +namespace tablet { + +//////////////////////////////////////////////////////////// +// CompactRowSetsOp +//////////////////////////////////////////////////////////// + +CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet) + : MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()), + MaintenanceOp::HIGH_IO_USAGE), + last_num_mrs_flushed_(0), + last_num_rs_compacted_(0), + tablet_(tablet) { +} + +void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) { + std::lock_guard l(lock_); + + // Any operation that changes the on-disk row layout invalidates the + // cached stats. + TabletMetrics* metrics = tablet_->metrics(); + if (metrics) { + uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); + uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); + if (prev_stats_.valid() && + new_num_mrs_flushed == last_num_mrs_flushed_ && + new_num_rs_compacted == last_num_rs_compacted_) { + *stats = prev_stats_; + return; + } else { + last_num_mrs_flushed_ = new_num_mrs_flushed; + last_num_rs_compacted_ = new_num_rs_compacted; + } + } + + tablet_->UpdateCompactionStats(&prev_stats_); + *stats = prev_stats_; +} + +bool CompactRowSetsOp::Prepare() { + std::lock_guard l(lock_); + // Invalidate the cached stats so that another section of the tablet can + // be compacted concurrently. + // + // TODO: we should acquire the rowset compaction locks here. Otherwise, until + // Compact() acquires them, the maintenance manager may compute the same + // stats for this op and run it again, even though Perform() will end up + // performing a much less fruitful compaction. See KUDU-790 for more details. + prev_stats_.Clear(); + return true; +} + +void CompactRowSetsOp::Perform() { + WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS), + Substitute("Compaction failed on $0", tablet_->tablet_id())); +} + +scoped_refptr CompactRowSetsOp::DurationHistogram() const { + return tablet_->metrics()->compact_rs_duration; +} + +scoped_refptr > CompactRowSetsOp::RunningGauge() const { + return tablet_->metrics()->compact_rs_running; +} + +//////////////////////////////////////////////////////////// +// MinorDeltaCompactionOp +//////////////////////////////////////////////////////////// + +MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet) + : MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()), + MaintenanceOp::HIGH_IO_USAGE), + last_num_mrs_flushed_(0), + last_num_dms_flushed_(0), + last_num_rs_compacted_(0), + last_num_rs_minor_delta_compacted_(0), + tablet_(tablet) { +} + +void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { + std::lock_guard l(lock_); + + // Any operation that changes the number of REDO files invalidates the + // cached stats. + TabletMetrics* metrics = tablet_->metrics(); + if (metrics) { + uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); + uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); + uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); + uint64_t new_num_rs_minor_delta_compacted = + metrics->delta_minor_compact_rs_duration->TotalCount(); + if (prev_stats_.valid() && + new_num_mrs_flushed == last_num_mrs_flushed_ && + new_num_dms_flushed == last_num_dms_flushed_ && + new_num_rs_compacted == last_num_rs_compacted_ && + new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) { + *stats = prev_stats_; + return; + } else { + last_num_mrs_flushed_ = new_num_mrs_flushed; + last_num_dms_flushed_ = new_num_dms_flushed; + last_num_rs_compacted_ = new_num_rs_compacted; + last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted; + } + } + + double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact( + RowSet::MINOR_DELTA_COMPACTION, nullptr); + prev_stats_.set_perf_improvement(perf_improv); + prev_stats_.set_runnable(perf_improv > 0); + *stats = prev_stats_; +} + +bool MinorDeltaCompactionOp::Prepare() { + std::lock_guard l(lock_); + // Invalidate the cached stats so that another rowset in the tablet can + // be delta compacted concurrently. + // + // TODO: See CompactRowSetsOp::Prepare(). + prev_stats_.Clear(); + return true; +} + +void MinorDeltaCompactionOp::Perform() { + WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION), + Substitute("Minor delta compaction failed on $0", tablet_->tablet_id())); +} + +scoped_refptr MinorDeltaCompactionOp::DurationHistogram() const { + return tablet_->metrics()->delta_minor_compact_rs_duration; +} + +scoped_refptr > MinorDeltaCompactionOp::RunningGauge() const { + return tablet_->metrics()->delta_minor_compact_rs_running; +} + +//////////////////////////////////////////////////////////// +// MajorDeltaCompactionOp +//////////////////////////////////////////////////////////// + +MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet) + : MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()), + MaintenanceOp::HIGH_IO_USAGE), + last_num_mrs_flushed_(0), + last_num_dms_flushed_(0), + last_num_rs_compacted_(0), + last_num_rs_minor_delta_compacted_(0), + last_num_rs_major_delta_compacted_(0), + tablet_(tablet) { +} + +void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) { + std::lock_guard l(lock_); + + // Any operation that changes the size of the on-disk data invalidates the + // cached stats. + TabletMetrics* metrics = tablet_->metrics(); + if (metrics) { + int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount(); + int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount(); + int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount(); + int64_t new_num_rs_minor_delta_compacted = + metrics->delta_minor_compact_rs_duration->TotalCount(); + int64_t new_num_rs_major_delta_compacted = + metrics->delta_major_compact_rs_duration->TotalCount(); + if (prev_stats_.valid() && + new_num_mrs_flushed == last_num_mrs_flushed_ && + new_num_dms_flushed == last_num_dms_flushed_ && + new_num_rs_compacted == last_num_rs_compacted_ && + new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ && + new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) { + *stats = prev_stats_; + return; + } else { + last_num_mrs_flushed_ = new_num_mrs_flushed; + last_num_dms_flushed_ = new_num_dms_flushed; + last_num_rs_compacted_ = new_num_rs_compacted; + last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted; + last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted; + } + } + + double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact( + RowSet::MAJOR_DELTA_COMPACTION, nullptr); + prev_stats_.set_perf_improvement(perf_improv); + prev_stats_.set_runnable(perf_improv > 0); + *stats = prev_stats_; +} + +bool MajorDeltaCompactionOp::Prepare() { + std::lock_guard l(lock_); + // Invalidate the cached stats so that another rowset in the tablet can + // be delta compacted concurrently. + // + // TODO: See CompactRowSetsOp::Prepare(). + prev_stats_.Clear(); + return true; +} + +void MajorDeltaCompactionOp::Perform() { + WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION), + Substitute("Major delta compaction failed on $0", tablet_->tablet_id())); +} + +scoped_refptr MajorDeltaCompactionOp::DurationHistogram() const { + return tablet_->metrics()->delta_major_compact_rs_duration; +} + +scoped_refptr > MajorDeltaCompactionOp::RunningGauge() const { + return tablet_->metrics()->delta_major_compact_rs_running; +} + +} // namespace tablet +} // namespace kudu diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h index f4ee1c30a2..1ea8f95c85 100644 --- a/src/kudu/tablet/tablet_mm_ops.h +++ b/src/kudu/tablet/tablet_mm_ops.h @@ -18,6 +18,7 @@ #ifndef KUDU_TABLET_TABLET_MM_OPS_H_ #define KUDU_TABLET_TABLET_MM_OPS_H_ +#include "kudu/util/locks.h" #include "kudu/util/maintenance_manager.h" namespace kudu { @@ -28,6 +29,8 @@ class AtomicGauge; namespace tablet { +class Tablet; + // MaintenanceOp for rowset compaction. // // This periodically invokes the tablet's CompactionPolicy to select a compaction. The