Skip to content

Commit

Permalink
log: remove SegmentAllocator functors
Browse files Browse the repository at this point in the history
When we roll the log, we close the active segment before switching to the
next preallocated segment. What happens when we close the active segment?
1. We sync it.
2. We write out the (now complete) footer.
3. We close the segment file. Under the hood, this will truncate any unused
   preallocated space out of the file and sync it one last time.
4. We open the closed segment file for reading. Note that this is the second
   time we've opened it; we also opened it once when we last rolled the log.
5. We create a new in-memory segment reader. To avoid needless IO, we
   initialize it using the in-memory header and footer, but we do end up
   reloading the segment's file size from disk.
6. We replace the last segment reader in the log with the reader constructed
   in step apache#5. Side effect: the original segment reader is now closed.

My original intent with this patch was to replace steps 4-6 with a simpler
"inject the footer into the existing in-memory segment reader". This proved
to be impossible without adding a new synchronization primitive to protect
the footer in the segment reader, which is something I wanted to avoid. And
the win would have been minimal: with proper file cache usage, step 4 is
likely to be a cache hit.

So now this patch just cleans up SegmentAllocator by removing its functors,
which I found difficult to follow. Instead, Log now manipulates LogReader
directly, using information passed back from SegmentAllocator functions.

Change-Id: Id2f5dd47e3be930288dcfeb1f77409fda45daac4
Reviewed-on: http://gerrit.cloudera.org:8080/14819
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
adembo committed Dec 4, 2019
1 parent 9d14d74 commit c29d144
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 185 deletions.
10 changes: 5 additions & 5 deletions src/kudu/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include "kudu/consensus/consensus_queue.h"

#include <cstddef>
#include <cstdint>
#include <memory>
Expand All @@ -23,7 +25,6 @@
#include <vector>

#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand All @@ -36,7 +37,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus-test-util.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/log-test-base.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
Expand Down Expand Up @@ -563,7 +563,7 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
// Roll the log every 10 ops
if (i % 10 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOver());
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}
ASSERT_OK(log_->WaitUntilAllFlushed());
Expand Down Expand Up @@ -626,7 +626,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
// Roll the log every 3 ops
if (i % 3 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOver());
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}

Expand All @@ -636,7 +636,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
// Roll the log every 3 ops
if (i % 3 == 0) {
ASSERT_OK(log_->AllocateSegmentAndRollOver());
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/kudu/consensus/log-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ class LogTestBase : public KuduTest {
}

Status RollLog() {
return log_->AllocateSegmentAndRollOver();
return log_->AllocateSegmentAndRollOverForTests();
}

std::string DumpSegmentsToString(const SegmentSequence& segments) {
static std::string DumpSegmentsToString(const SegmentSequence& segments) {
std::string dump;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
dump.append("------------\n");
Expand Down
5 changes: 2 additions & 3 deletions src/kudu/consensus/log-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <vector>

#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -193,7 +192,7 @@ TEST_P(LogTestOptionalCompression, TestMultipleEntriesInABatch) {
AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2);

// RollOver() the batch so that we have a properly formed footer.
ASSERT_OK(log_->AllocateSegmentAndRollOver());
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());

SegmentSequence segments;
log_->reader()->GetSegmentsSnapshot(&segments);
Expand Down Expand Up @@ -488,7 +487,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
ASSERT_EQ(written_entries_size, log_->segment_allocator_.active_segment_->written_offset());

// When we roll it should go back to the header size.
ASSERT_OK(log_->AllocateSegmentAndRollOver());
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
ASSERT_EQ(header_size, log_->segment_allocator_.active_segment_->written_offset());
written_entries_size = header_size;

Expand Down
128 changes: 72 additions & 56 deletions src/kudu/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <cerrno>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
Expand Down Expand Up @@ -177,8 +176,6 @@ namespace log {
using consensus::CommitMsg;
using consensus::ReplicateRefPtr;
using env_util::OpenFileForRandom;
using std::bind;
using std::function;
using std::shared_ptr;
using std::string;
using std::vector;
Expand Down Expand Up @@ -440,21 +437,17 @@ bool Log::append_thread_active_for_tests() const {
SegmentAllocator::SegmentAllocator(const LogOptions* opts,
const LogContext* ctx,
Schema schema,
uint32_t schema_version,
function<Status(scoped_refptr<ReadableLogSegment>)>
reader_replace_last_segment,
function<Status(scoped_refptr<ReadableLogSegment>)>
reader_add_segment)
: reader_replace_last_segment_(std::move(reader_replace_last_segment)),
reader_add_segment_(std::move(reader_add_segment)),
opts_(opts),
uint32_t schema_version)
: opts_(opts),
ctx_(ctx),
max_segment_size_(opts_->segment_size_mb * 1024 * 1024),
schema_(std::move(schema)),
schema_version_(schema_version),
sync_disabled_(false) {}

Status SegmentAllocator::Init(uint64_t sequence_number) {
Status SegmentAllocator::Init(
uint64_t sequence_number,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Init the compression codec.
if (!FLAGS_log_compression_codec.empty()) {
auto codec_type = GetCompressionCodecType(FLAGS_log_compression_codec);
Expand All @@ -467,10 +460,17 @@ Status SegmentAllocator::Init(uint64_t sequence_number) {
RETURN_NOT_OK(ThreadPoolBuilder("log-alloc")
.set_max_threads(1)
.Build(&allocation_pool_));
return AllocateSegmentAndRollOver();

scoped_refptr<ReadableLogSegment> closed_segment;
RETURN_NOT_OK(AllocateSegmentAndRollOver(&closed_segment, new_readable_segment));
DCHECK(!closed_segment); // There was no previously active segment.
return Status::OK();
}

Status SegmentAllocator::AllocateOrRollOverIfNecessary(uint32_t write_size_bytes) {
Status SegmentAllocator::AllocateOrRollOverIfNecessary(
uint32_t write_size_bytes,
scoped_refptr<ReadableLogSegment>* closed_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
bool should_rollover = false;
// if the size of this entry overflows the current segment, get a new one
{
Expand All @@ -493,7 +493,7 @@ Status SegmentAllocator::AllocateOrRollOverIfNecessary(uint32_t write_size_bytes
if (should_rollover) {
TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
RETURN_NOT_OK(RollOver());
RETURN_NOT_OK(RollOver(closed_segment, new_readable_segment));
}
}
return Status::OK();
Expand Down Expand Up @@ -531,7 +531,8 @@ Status SegmentAllocator::Sync() {
return Status::OK();
}

Status SegmentAllocator::CloseCurrentSegment(CloseMode mode) {
Status SegmentAllocator::CloseCurrentSegment(
scoped_refptr<ReadableLogSegment>* closed_segment) {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
}
Expand All @@ -549,10 +550,8 @@ Status SegmentAllocator::CloseCurrentSegment(CloseMode mode) {
RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
}

if (mode == CLOSE_AND_REPLACE_LAST_SEGMENT) {
scoped_refptr<ReadableLogSegment> last_segment;
RETURN_NOT_OK(GetClosedSegment(&last_segment));
return reader_replace_last_segment_(std::move(last_segment));
if (closed_segment) {
RETURN_NOT_OK(GetClosedSegment(closed_segment));
}

return Status::OK();
Expand All @@ -578,12 +577,14 @@ void SegmentAllocator::StopAllocationThread() {
allocation_pool_->Shutdown();
}

Status SegmentAllocator::AllocateSegmentAndRollOver() {
Status SegmentAllocator::AllocateSegmentAndRollOver(
scoped_refptr<ReadableLogSegment>* closed_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
{
std::lock_guard<RWMutex> l(allocation_lock_);
RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
}
return RollOver();
return RollOver(closed_segment, new_readable_segment);
}

Status SegmentAllocator::GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment) {
Expand Down Expand Up @@ -663,7 +664,8 @@ Status SegmentAllocator::AllocateNewSegment() {
return Status::OK();
}

Status SegmentAllocator::SwitchToAllocatedSegment() {
Status SegmentAllocator::SwitchToAllocatedSegment(
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
// Increment "next" log segment seqno.
active_segment_sequence_number_++;
const auto& tablet_id = ctx_->tablet_id;
Expand Down Expand Up @@ -700,18 +702,22 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
}
RETURN_NOT_OK_PREPEND(new_segment->WriteHeaderAndOpen(header), "Failed to write header");

// Open the segment we just created in readable form and add it to the reader.
// Open the segment we just created in readable form; it is the caller's
// responsibility to add it to the reader.
//
// TODO(todd): consider using a global FileCache here? With short log segments and
// lots of tablets, this file descriptor usage may add up.
unique_ptr<RandomAccessFile> readable_file;

RandomAccessFileOptions opts;
RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
scoped_refptr<ReadableLogSegment> readable_segment(
new ReadableLogSegment(new_segment_path,
shared_ptr<RandomAccessFile>(readable_file.release())));
RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
RETURN_NOT_OK(reader_add_segment_(std::move(readable_segment)));
{
unique_ptr<RandomAccessFile> readable_file;

RandomAccessFileOptions opts;
RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
scoped_refptr<ReadableLogSegment> readable_segment(
new ReadableLogSegment(new_segment_path,
shared_ptr<RandomAccessFile>(readable_file.release())));
RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
*new_readable_segment = std::move(readable_segment);
}

// Now set 'active_segment_' to the new segment.
active_segment_ = std::move(new_segment);
Expand All @@ -721,19 +727,21 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
return Status::OK();
}

Status SegmentAllocator::RollOver() {
Status SegmentAllocator::RollOver(
scoped_refptr<ReadableLogSegment>* closed_segment,
scoped_refptr<ReadableLogSegment>* new_readable_segment) {
SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);

// Wait for any on-going allocations to finish.
RETURN_NOT_OK(allocation_status_.Get());
DCHECK_EQ(kAllocationFinished, allocation_state());

// If this isn't the first active segment, close the segment and make it
// available to the log reader.
// If this isn't the first active segment, close it and return a reopened
// segment reader so that the caller can update its log reader.
if (active_segment_) {
RETURN_NOT_OK(CloseCurrentSegment(CLOSE_AND_REPLACE_LAST_SEGMENT));
RETURN_NOT_OK(CloseCurrentSegment(closed_segment));
}
RETURN_NOT_OK(SwitchToAllocatedSegment());
RETURN_NOT_OK(SwitchToAllocatedSegment(new_readable_segment));

VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
<< active_segment_->path();
Expand Down Expand Up @@ -773,9 +781,7 @@ Log::Log(LogOptions options, LogContext ctx, Schema schema, uint32_t schema_vers
log_state_(kLogInitialized),
entry_batch_queue_(FLAGS_group_commit_queue_size_bytes),
append_thread_(new AppendThread(this)),
segment_allocator_(&options_, &ctx_, std::move(schema), schema_version,
bind(&Log::ReplaceSegmentInReader, this, std::placeholders::_1),
bind(&Log::AddEmptySegmentInReader, this, std::placeholders::_1)),
segment_allocator_(&options_, &ctx_, std::move(schema), schema_version),
on_disk_size_(0) {
}

Expand Down Expand Up @@ -813,7 +819,9 @@ Status Log::Init() {
}

// We always create a new segment when the log starts.
RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num));
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num, &new_readable_segment));
reader_->AppendEmptySegment(std::move(new_readable_segment));
RETURN_NOT_OK(append_thread_->Init());
log_state_ = kLogWriting;
return Status::OK();
Expand Down Expand Up @@ -885,7 +893,17 @@ Status Log::WriteBatch(LogEntryBatch* entry_batch) {
return Status::OK();
}

RETURN_NOT_OK(segment_allocator_.AllocateOrRollOverIfNecessary(entry_batch_bytes));
scoped_refptr<ReadableLogSegment> closed_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.AllocateOrRollOverIfNecessary(
entry_batch_bytes, &closed_segment, &new_readable_segment));
if (closed_segment) {
// Must be done before a new segment is appended.
reader_->ReplaceLastSegment(std::move(closed_segment));
}
if (new_readable_segment) {
reader_->AppendEmptySegment(std::move(new_readable_segment));
}
auto* active_segment = segment_allocator_.active_segment_.get();
int64_t start_offset = active_segment->written_offset();

Expand Down Expand Up @@ -930,9 +948,17 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
return Status::OK();
}

Status Log::AllocateSegmentAndRollOver() {
Status Log::AllocateSegmentAndRollOverForTests() {
std::lock_guard<rw_spinlock> l(segment_idle_lock_);
return segment_allocator_.AllocateSegmentAndRollOver();
scoped_refptr<ReadableLogSegment> closed_segment;
scoped_refptr<ReadableLogSegment> new_readable_segment;
RETURN_NOT_OK(segment_allocator_.AllocateSegmentAndRollOver(
&closed_segment, &new_readable_segment));
if (closed_segment) {
reader_->ReplaceLastSegment(std::move(closed_segment));
}
reader_->AppendEmptySegment(std::move(new_readable_segment));
return Status::OK();
}

Status Log::Sync() {
Expand Down Expand Up @@ -1127,7 +1153,7 @@ Status Log::Close() {
}
}

RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(SegmentAllocator::CLOSE));
RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(/*closed_segment=*/ nullptr));
VLOG_WITH_PREFIX(1) << "Log closed";

// Release FDs held by these objects.
Expand Down Expand Up @@ -1184,16 +1210,6 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
return Status::OK();
}

Status Log::AddEmptySegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
std::lock_guard<percpu_rwlock> l(state_lock_);
return reader_->AppendEmptySegment(std::move(segment));
}

Status Log::ReplaceSegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
std::lock_guard<percpu_rwlock> l(state_lock_);
return reader_->ReplaceLastSegment(std::move(segment));
}

std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }

Log::~Log() {
Expand Down
Loading

0 comments on commit c29d144

Please sign in to comment.