Skip to content

Commit

Permalink
KUDU-1559. Fix block manager metric for aborted blocks
Browse files Browse the repository at this point in the history
We weren't properly decrementing the "open writable blocks" metric when
aborting a LogWritableBlock.

Change-Id: I4447d2c4e3073732dc423cc7b6dd22e2ba9ad926
Reviewed-on: http://gerrit.cloudera.org:8080/3964
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
toddlipcon committed Aug 14, 2016
1 parent bcc4bbf commit 3cec55a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 45 deletions.
45 changes: 27 additions & 18 deletions src/kudu/fs/block_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,32 @@ TYPED_TEST(BlockManagerTest, WritableBlockStateTest) {
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
}

static void CheckMetrics(const scoped_refptr<MetricEntity>& metrics,
int blocks_open_reading, int blocks_open_writing,
int total_readable_blocks, int total_writable_blocks,
int total_bytes_read, int total_bytes_written) {
ASSERT_EQ(blocks_open_reading, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_reading).get())->value());
ASSERT_EQ(blocks_open_writing, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_writing).get())->value());
ASSERT_EQ(total_readable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_readable_blocks).get())->value());
ASSERT_EQ(total_writable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_writable_blocks).get())->value());
ASSERT_EQ(total_bytes_read, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_read).get())->value());
ASSERT_EQ(total_bytes_written, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_written).get())->value());
}

TYPED_TEST(BlockManagerTest, AbortTest) {
MetricRegistry registry;
scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
ASSERT_OK(this->ReopenBlockManager(entity,
shared_ptr<MemTracker>(),
{ GetTestDataDirectory() },
false));

gscoped_ptr<WritableBlock> written_block;
ASSERT_OK(this->bm_->CreateBlock(&written_block));
string test_data = "test data";
Expand All @@ -542,6 +567,8 @@ TYPED_TEST(BlockManagerTest, AbortTest) {
ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
.IsNotFound());

ASSERT_NO_FATAL_FAILURE(CheckMetrics(entity, 0, 0, 0, 2, 0, test_data.size() * 2));
}

TYPED_TEST(BlockManagerTest, PersistenceTest) {
Expand Down Expand Up @@ -632,24 +659,6 @@ TYPED_TEST(BlockManagerTest, ConcurrentCloseReadableBlockTest) {
}
}

static void CheckMetrics(const scoped_refptr<MetricEntity>& metrics,
int blocks_open_reading, int blocks_open_writing,
int total_readable_blocks, int total_writable_blocks,
int total_bytes_read, int total_bytes_written) {
ASSERT_EQ(blocks_open_reading, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_reading).get())->value());
ASSERT_EQ(blocks_open_writing, down_cast<AtomicGauge<uint64_t>*>(
metrics->FindOrNull(METRIC_block_manager_blocks_open_writing).get())->value());
ASSERT_EQ(total_readable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_readable_blocks).get())->value());
ASSERT_EQ(total_writable_blocks, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_writable_blocks).get())->value());
ASSERT_EQ(total_bytes_read, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_read).get())->value());
ASSERT_EQ(total_bytes_written, down_cast<Counter*>(
metrics->FindOrNull(METRIC_block_manager_total_bytes_written).get())->value());
}

TYPED_TEST(BlockManagerTest, MetricsTest) {
const string kTestData = "test data";
MetricRegistry registry;
Expand Down
39 changes: 12 additions & 27 deletions src/kudu/fs/log_block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util_prod.h"
#include "kudu/util/threadpool.h"
Expand Down Expand Up @@ -773,24 +774,6 @@ class LogWritableBlock : public WritableBlock {
Status AppendMetadata();

private:

// RAII-style class for finishing writable blocks in DoClose().
class ScopedFinisher {
public:
// Both 'block' and 's' must outlive the finisher.
ScopedFinisher(LogWritableBlock* block, Status* s) :
block_(block),
status_(s) {
}
~ScopedFinisher() {
block_->state_ = CLOSED;
*status_ = block_->container_->FinishBlock(*status_, block_);
}
private:
LogWritableBlock* block_;
Status* status_;
};

// The owning container. Must outlive the block.
LogBlockContainer* container_;

Expand Down Expand Up @@ -901,14 +884,22 @@ Status LogWritableBlock::DoClose(SyncMode mode) {
// Tracks the first failure (if any).
//
// It's important that any subsequent failures mutate 's' before
// returning. Otherwise 'finisher' won't properly provide the first
// returning. Otherwise 'cleanup' won't properly provide the first
// failure to LogBlockContainer::FinishBlock().
//
// Note also that when 'finisher' goes out of scope it may mutate 's'.
// Note also that when 'cleanup' goes out of scope it may mutate 's'.
Status s;
{
ScopedFinisher finisher(this, &s);
auto cleanup = MakeScopedCleanup([&]() {
if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_writing->Decrement();
container_->metrics()->generic_metrics.total_bytes_written->IncrementBy(
BytesAppended());
}

state_ = CLOSED;
s = container_->FinishBlock(s, this);
});
// FlushDataAsync() was not called; append the metadata now.
if (state_ == CLEAN || state_ == DIRTY) {
s = AppendMetadata();
Expand All @@ -926,12 +917,6 @@ Status LogWritableBlock::DoClose(SyncMode mode) {
// TODO: Sync just this block's dirty metadata.
s = container_->SyncMetadata();
RETURN_NOT_OK(s);

if (container_->metrics()) {
container_->metrics()->generic_metrics.blocks_open_writing->Decrement();
container_->metrics()->generic_metrics.total_bytes_written->IncrementBy(
BytesAppended());
}
}
}

Expand Down

0 comments on commit 3cec55a

Please sign in to comment.