Skip to content

Commit

Permalink
env: add WriteV() API
Browse files Browse the repository at this point in the history
Adds WriteV() methods to RWFile and WritableFile that allows
writing multiple data Slices in one call. The implementation
leverages the pwritev system call when possible and simulates it
with pwrite calls when unavailable.

Additionally adds WriteV()/AppendV() methods to the block manager abstraction.
These methods will be used in KUDU-463 to support writing
checksums and block data in a single call.

Change-Id: I30acfa2e4918ef945c55646647913b36a07daaa4
Reviewed-on: http://gerrit.cloudera.org:8080/6800
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Adar Dembo <[email protected]>
  • Loading branch information
granthenke authored and adembo committed May 4, 2017
1 parent 6217879 commit 174a058
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 133 deletions.
23 changes: 14 additions & 9 deletions src/kudu/cfile/cfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kudu/cfile/cfile_writer.h"

#include <glog/logging.h>
#include <numeric>
#include <string>
#include <utility>

Expand Down Expand Up @@ -64,6 +65,7 @@ TAG_FLAG(cfile_do_on_finish, experimental);
using google::protobuf::RepeatedPtrField;
using kudu::fs::ScopedWritableBlockCloser;
using kudu::fs::WritableBlock;
using std::accumulate;
using std::string;
using std::unique_ptr;

Expand Down Expand Up @@ -461,9 +463,7 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
out_slices = data_slices;
}

for (const Slice &data : out_slices) {
RETURN_NOT_OK(WriteRawData(data));
}
RETURN_NOT_OK(WriteRawData(out_slices));

uint64_t total_size = off_ - start_offset;

Expand All @@ -473,14 +473,19 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
return Status::OK();
}

Status CFileWriter::WriteRawData(const Slice& data) {
Status s = block_->Append(data);

Status CFileWriter::WriteRawData(const vector<Slice>& data) {
size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
Status s = block_->AppendV(data);
if (!s.ok()) {
LOG(WARNING) << "Unable to append slice of size "
<< data.size() << " at offset " << off_
<< ": " << s.ToString();
LOG(WARNING) << "Unable to append data of size "
<< data_size << " at offset " << off_
<< ": " << s.ToString();
}
off_ += data.size();
off_ += data_size;
return s;
}

Expand Down
2 changes: 1 addition & 1 deletion src/kudu/cfile/cfile_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class CFileWriter {
BlockPointer *block_ptr,
const char *name_for_log);

Status WriteRawData(const Slice& data);
Status WriteRawData(const vector<Slice>& data);

Status FinishCurDataBlock();

Expand Down
7 changes: 4 additions & 3 deletions src/kudu/consensus/log_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,10 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data,
InlineEncodeFixed32(&header_buf[12], crc::Crc32c(&header_buf[0], kEntryHeaderSizeV2 - 4));

// Write the header to the file, followed by the batch data itself.
RETURN_NOT_OK(writable_file_->AppendVector({
Slice(header_buf, arraysize(header_buf)),
data_to_write}));
RETURN_NOT_OK(writable_file_->AppendV({
Slice(header_buf,
arraysize(header_buf)),
data_to_write}));
written_offset_ += arraysize(header_buf) + data_to_write.size();
return Status::OK();
}
Expand Down
6 changes: 6 additions & 0 deletions src/kudu/fs/block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class WritableBlock : public Block {
// outstanding data to reach the disk.
virtual Status Append(const Slice& data) = 0;

// Appends multiple chunks of data referenced by 'data' to the block.
//
// Does not guarantee durability of 'data'; Close() must be called for all
// outstanding data to reach the disk.
virtual Status AppendV(const std::vector<Slice>& data) = 0;

// Begins an asynchronous flush of dirty block data to disk.
//
// This is purely a performance optimization for Close(); if there is
Expand Down
21 changes: 17 additions & 4 deletions src/kudu/fs/file_block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kudu/fs/file_block_manager.h"

#include <memory>
#include <numeric>
#include <string>
#include <vector>

Expand All @@ -37,6 +38,7 @@
#include "kudu/util/random_util.h"
#include "kudu/util/status.h"

using std::accumulate;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
Expand Down Expand Up @@ -224,6 +226,8 @@ class FileWritableBlock : public WritableBlock {

virtual Status Append(const Slice& data) OVERRIDE;

virtual Status AppendV(const vector<Slice>& data) OVERRIDE;

virtual Status FlushDataAsync() OVERRIDE;

virtual size_t BytesAppended() const OVERRIDE;
Expand Down Expand Up @@ -297,14 +301,23 @@ const BlockId& FileWritableBlock::id() const {
}

Status FileWritableBlock::Append(const Slice& data) {
DCHECK(state_ == CLEAN || state_ == DIRTY)
<< "Invalid state: " << state_;
return AppendV({ data });
}

RETURN_NOT_OK(writer_->Append(data));
Status FileWritableBlock::AppendV(const vector<Slice>& data) {
DCHECK(state_ == CLEAN || state_ == DIRTY)
<< "Invalid state: " << state_;
RETURN_NOT_OK(writer_->AppendV(data));
RETURN_NOT_OK(location_.data_dir()->RefreshIsFull(
DataDir::RefreshMode::ALWAYS));
state_ = DIRTY;
bytes_appended_ += data.size();

// Calculate the amount of data written
size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
bytes_appended_ += bytes_written;
return Status::OK();
}

Expand Down
39 changes: 30 additions & 9 deletions src/kudu/fs/log_block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,12 @@ class LogBlockContainer {
Status EnsurePreallocated(int64_t block_start_offset,
size_t next_append_length);

// Writes 'data' to this container's data file at offset 'offset'.
//
// The on-disk effects of this call are made durable only after SyncData().
// See RWFile::Write()
Status WriteData(int64_t offset, const Slice& data);

// See RWFile::WriteV()
Status WriteVData(int64_t offset, const vector<Slice>& data);

// See RWFile::Read().
Status ReadData(int64_t offset, Slice* result) const;

Expand Down Expand Up @@ -821,14 +822,22 @@ Status LogBlockContainer::PunchHole(int64_t offset, int64_t length) {
}

Status LogBlockContainer::WriteData(int64_t offset, const Slice& data) {
return WriteVData(offset, { data });
}

Status LogBlockContainer::WriteVData(int64_t offset, const vector<Slice>& data) {
DCHECK_GE(offset, next_block_offset_);

RETURN_NOT_OK(data_file_->Write(offset, data));
RETURN_NOT_OK(data_file_->WriteV(offset, data));

// This append may have changed the container size if:
// 1. It was large enough that it blew out the preallocated space.
// 2. Preallocation was disabled.
if (offset + data.size() > preallocated_offset_) {
size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});
if (offset + data_size > preallocated_offset_) {
RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
}
return Status::OK();
Expand Down Expand Up @@ -1051,6 +1060,8 @@ class LogWritableBlock : public WritableBlock {

virtual Status Append(const Slice& data) OVERRIDE;

virtual Status AppendV(const vector<Slice>& data) OVERRIDE;

virtual Status FlushDataAsync() OVERRIDE;

virtual size_t BytesAppended() const OVERRIDE;
Expand Down Expand Up @@ -1130,26 +1141,36 @@ BlockManager* LogWritableBlock::block_manager() const {
}

Status LogWritableBlock::Append(const Slice& data) {
return AppendV({ data });
}

Status LogWritableBlock::AppendV(const vector<Slice>& data) {
DCHECK(state_ == CLEAN || state_ == DIRTY)
<< "Invalid state: " << state_;
<< "Invalid state: " << state_;

// Calculate the amount of data to write
size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
});

// The metadata change is deferred to Close() or FlushDataAsync(),
// whichever comes first. We can't do it now because the block's
// length is still in flux.

int64_t cur_block_offset = block_offset_ + block_length_;
RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data.size()));
RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data_size));

MicrosecondsInt64 start_time = GetMonoTimeMicros();
RETURN_NOT_OK(container_->WriteData(cur_block_offset, data));
RETURN_NOT_OK(container_->WriteVData(cur_block_offset, data));
MicrosecondsInt64 end_time = GetMonoTimeMicros();

int64_t dur = end_time - start_time;
TRACE_COUNTER_INCREMENT("lbm_write_time_us", dur);
const char* counter = BUCKETED_COUNTER_NAME("lbm_writes", dur);
TRACE_COUNTER_INCREMENT(counter, 1);

block_length_ += data.size();
block_length_ += data_size;
state_ = DIRTY;
return Status::OK();
}
Expand Down
25 changes: 15 additions & 10 deletions src/kudu/util/env-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

DECLARE_bool(never_fsync);
DECLARE_int32(env_inject_short_read_bytes);
DECLARE_int32(env_inject_short_write_bytes);

namespace kudu {

Expand Down Expand Up @@ -142,8 +143,9 @@ class TestEnv : public KuduTest {
ASSERT_NO_FATAL_FAILURE(VerifyTestData(s, offset));
}

void TestAppendVector(size_t num_slices, size_t slice_size, size_t iterations,
bool fast, bool pre_allocate, const WritableFileOptions& opts) {
void TestAppendV(size_t num_slices, size_t slice_size, size_t iterations,
bool fast, bool pre_allocate,
const WritableFileOptions &opts) {
const string kTestPath = GetTestPath("test_env_appendvec_read_append");
shared_ptr<WritableFile> file;
ASSERT_OK(env_util::OpenFileForWrite(opts, env_, kTestPath, &file));
Expand All @@ -158,6 +160,9 @@ class TestEnv : public KuduTest {

MakeVectors(num_slices, slice_size, iterations, &data, &input);

// Force short writes to half the slice length.
FLAGS_env_inject_short_write_bytes = slice_size / 2;

shared_ptr<RandomAccessFile> raf;

if (!fast) {
Expand All @@ -172,7 +177,7 @@ class TestEnv : public KuduTest {
LOG_TIMING(INFO, test_descr) {
for (int i = 0; i < iterations; i++) {
if (fast || random() % 2) {
ASSERT_OK(file->AppendVector(input[i]));
ASSERT_OK(file->AppendV(input[i]));
} else {
for (const Slice& slice : input[i]) {
ASSERT_OK(file->Append(slice));
Expand Down Expand Up @@ -494,18 +499,18 @@ TEST_F(TestEnv, TestIOVMax) {
VerifyTestData(Slice(scratch, data_size), 0);
}

TEST_F(TestEnv, TestAppendVector) {
TEST_F(TestEnv, TestAppendV) {
WritableFileOptions opts;
LOG(INFO) << "Testing AppendVector() only, NO pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendVector(2000, 1024, 5, true, false, opts));
LOG(INFO) << "Testing AppendV() only, NO pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, false, opts));

if (!fallocate_supported_) {
LOG(INFO) << "fallocate not supported, skipping preallocated runs";
} else {
LOG(INFO) << "Testing AppendVector() only, WITH pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendVector(2000, 1024, 5, true, true, opts));
LOG(INFO) << "Testing AppendVector() together with Append() and Read(), WITH pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendVector(128, 4096, 5, false, true, opts));
LOG(INFO) << "Testing AppendV() only, WITH pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, true, opts));
LOG(INFO) << "Testing AppendV() together with Append() and Read(), WITH pre-allocation";
ASSERT_NO_FATAL_FAILURE(TestAppendV(128, 4096, 5, false, true, opts));
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/kudu/util/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ class WritableFile {
//
// For implementation specific quirks and details, see comments in
// implementation source code (e.g., env_posix.cc)
virtual Status AppendVector(const std::vector<Slice>& data_vector) = 0;
virtual Status AppendV(const std::vector<Slice>& data) = 0;

// Pre-allocates 'size' bytes for the file in the underlying filesystem.
// size bytes are added to the current pre-allocated size or to the current
Expand Down Expand Up @@ -539,6 +539,9 @@ class RWFile {
// Writes 'data' to the file position given by 'offset'.
virtual Status Write(uint64_t offset, const Slice& data) = 0;

// Writes the 'data' vector to the file position given by 'offset'.
virtual Status WriteV(uint64_t offset, const std::vector<Slice>& data) = 0;

// Preallocates 'length' bytes for the file in the underlying filesystem
// beginning at 'offset'. It is safe to preallocate the same range
// repeatedly; this is an idempotent operation.
Expand Down
Loading

0 comments on commit 174a058

Please sign in to comment.