Skip to content

Commit

Permalink
pb_util: avoid repeated stat() calls reading files
Browse files Browse the repository at this point in the history
This reduces the number of fstat syscalls while loading a host with 11M blocks
from 29.3M to 147K.

Note that this also changes Env to return EndOfFile when reading from
disk rather than IOError as it used to.

Change-Id: I27371800604bcb20bafae7946d3b3e84af094598
Reviewed-on: http://gerrit.cloudera.org:8080/8010
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
toddlipcon committed Oct 10, 2017
1 parent c929a06 commit 2a802f9
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/kudu/util/env-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,11 @@ TEST_F(TestEnv, TestReadFully) {
// Turn short reads off again
FLAGS_env_inject_short_read_bytes = 0;

// Verify that Read fails with an IOError at EOF.
// Verify that Read fails with an EndOfFile error EOF.
Slice s2(scratch.get(), 200);
Status status = raf->Read(kFileSize - 100, s2);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsIOError());
ASSERT_TRUE(status.IsEndOfFile());
ASSERT_STR_CONTAINS(status.ToString(), "EOF");
}

Expand Down Expand Up @@ -527,10 +527,10 @@ TEST_F(TestEnv, TestReadVFully) {
// Turn short reads off again
FLAGS_env_inject_short_read_bytes = 0;

// Verify that Read fails with an IOError at EOF.
// Verify that Read fails with an EndOfFile error at EOF.
Status status = file->ReadV(5, results);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsIOError());
ASSERT_TRUE(status.IsEndOfFile());
ASSERT_STR_CONTAINS(status.ToString(), "EOF");
}

Expand Down
2 changes: 1 addition & 1 deletion src/kudu/util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Status DoReadV(int fd, const string& filename, uint64_t offset,
}
if (PREDICT_FALSE(r == 0)) {
// EOF.
return Status::IOError(
return Status::EndOfFile(
Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset));
}
if (PREDICT_TRUE(r == rem)) {
Expand Down
1 change: 1 addition & 0 deletions src/kudu/util/pb_util-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) {
ASSERT_OK(pb_reader.Open());

for (int i = 0; i < 10; i++) {
SCOPED_TRACE(i);
// Write a message and read it back.
pb.set_value(i);
ASSERT_OK(pb_writer->Append(pb));
Expand Down
68 changes: 51 additions & 17 deletions src/kudu/util/pb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <utility>
#include <vector>

#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
Expand Down Expand Up @@ -236,17 +237,35 @@ Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
return Status::OK();
}

// If necessary, get the size of the file opened by 'reader' in 'cached_file_size'.
// If 'cached_file_size' already has a value, this is a no-op.
template<typename ReadableFileType>
Status CacheFileSize(ReadableFileType* reader,
boost::optional<uint64_t>* cached_file_size) {
if (*cached_file_size) {
return Status::OK();
}

uint64_t file_size;
RETURN_NOT_OK(reader->Size(&file_size));
*cached_file_size = file_size;
return Status::OK();
}

// Read and parse a message of the specified format at the given offset in the
// format documented in pb_util.h. 'offset' is an in-out parameter and will be
// updated with the new offset on success. On failure, 'offset' is not modified.
template<typename ReadableFileType>
Status ReadPBStartingAt(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
Status ReadPBStartingAt(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, Message* msg) {
uint64_t tmp_offset = *offset;
VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;

uint64_t file_size;
RETURN_NOT_OK(reader->Size(&file_size));
if (tmp_offset == file_size) {
RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
uint64_t file_size = cached_file_size->get();

if (tmp_offset == *cached_file_size) {
return Status::EndOfFile("Reached end of file");
}

Expand Down Expand Up @@ -318,19 +337,29 @@ Status ReadPBStartingAt(ReadableFileType* reader, int version, uint64_t* offset,
// Wrapper around ReadPBStartingAt() to enforce that we don't return
// Status::Incomplete() for V1 format files.
template<typename ReadableFileType>
Status ReadFullPB(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
Status s = ReadPBStartingAt(reader, version, offset, msg);
Status ReadFullPB(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, Message* msg) {
bool had_cached_size = *cached_file_size != boost::none;
Status s = ReadPBStartingAt(reader, version, cached_file_size, offset, msg);
if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
return Status::Corruption("Unrecoverable incomplete record", s.ToString());
}
// If we hit EOF, but we were using a cached view of the file size, then it might be
// that the file has been extended. Invalidate the cache and try again.
if (had_cached_size && (s.IsIncomplete() || s.IsEndOfFile())) {
*cached_file_size = boost::none;
return ReadFullPB(reader, version, cached_file_size, offset, msg);
}
return s;
}

// Read and parse the protobuf container file-level header documented in pb_util.h.
template<typename ReadableFileType>
Status ParsePBFileHeader(ReadableFileType* reader, uint64_t* offset, int* version) {
uint64_t file_size;
RETURN_NOT_OK(reader->Size(&file_size));
Status ParsePBFileHeader(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size,
uint64_t* offset, int* version) {
RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
uint64_t file_size = cached_file_size->get();

// We initially read enough data for a V2+ file header. This optimizes for
// V2+ and is valid on a V1 file because we don't consider these files valid
Expand Down Expand Up @@ -382,9 +411,11 @@ Status ParsePBFileHeader(ReadableFileType* reader, uint64_t* offset, int* versio

// Read and parse the supplemental header from the container file.
template<typename ReadableFileType>
Status ReadSupplementalHeader(ReadableFileType* reader, int version, uint64_t* offset,
Status ReadSupplementalHeader(ReadableFileType* reader, int version,
boost::optional<uint64_t>* cached_file_size,
uint64_t* offset,
ContainerSupHeaderPB* sup_header) {
RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, offset, sup_header),
RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, cached_file_size, offset, sup_header),
Substitute("Could not read supplemental header from proto container file $0 "
"with version $1 at offset $2",
reader->filename(), version, *offset));
Expand Down Expand Up @@ -640,10 +671,12 @@ Status WritablePBContainerFile::CreateNew(const Message& msg) {

Status WritablePBContainerFile::OpenExisting() {
DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
boost::optional<uint64_t> size;
RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &size, &offset_, &version_));
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header));
RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file.
RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &size,
&offset_, &sup_header));
offset_ = size.get(); // Reset the write offset to the end of the file.
state_ = FileState::OPEN;
return Status::OK();
}
Expand Down Expand Up @@ -801,9 +834,10 @@ ReadablePBContainerFile::~ReadablePBContainerFile() {

Status ReadablePBContainerFile::Open() {
DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &offset_, &version_));
RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &cached_file_size_, &offset_, &version_));
ContainerSupHeaderPB sup_header;
RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &offset_, &sup_header));
RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &cached_file_size_,
&offset_, &sup_header));
protos_.reset(sup_header.release_protos());
pb_type_ = sup_header.pb_type();
state_ = FileState::OPEN;
Expand All @@ -812,7 +846,7 @@ Status ReadablePBContainerFile::Open() {

Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
DCHECK_EQ(FileState::OPEN, state_);
return ReadFullPB(reader_.get(), version_, &offset_, msg);
return ReadFullPB(reader_.get(), version_, &cached_file_size_, &offset_, msg);
}

Status ReadablePBContainerFile::GetPrototype(const Message** prototype) {
Expand Down
5 changes: 5 additions & 0 deletions src/kudu/util/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <string>

#include <boost/optional/optional.hpp>
#include <google/protobuf/message.h>
#include <gtest/gtest_prod.h>

Expand Down Expand Up @@ -436,6 +437,10 @@ class ReadablePBContainerFile {
int version_;
uint64_t offset_;

// The size of the file we are reading, or 'none' if it hasn't yet been
// read.
boost::optional<uint64_t> cached_file_size_;

// The fully-qualified PB type name of the messages in the container.
std::string pb_type_;

Expand Down

0 comments on commit 2a802f9

Please sign in to comment.