Skip to content

Commit

Permalink
Replace Status with IOStatus for block fetcher IO function (facebook#…
Browse files Browse the repository at this point in the history
…8130)

Summary:
To propagate the IOStatus from file reads to RocksDB read logic, some of the existing status needs to be replaced by IOStatus.

Pull Request resolved: facebook#8130

Test Plan: make check

Reviewed By: anand1976

Differential Revision: D27440188

Pulled By: zhichao-cao

fbshipit-source-id: bbe7622c2106fe4e46871d60f7c26944e5030d78
  • Loading branch information
zhichao-cao authored and facebook-github-bot committed Apr 1, 2021
1 parent c43a37a commit 1700236
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 79 deletions.
54 changes: 27 additions & 27 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@
#include "util/rate_limiter.h"

namespace ROCKSDB_NAMESPACE {
Status RandomAccessFileReader::Create(
IOStatus RandomAccessFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
std::unique_ptr<FSRandomAccessFile> file;
Status s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
if (s.ok()) {
IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new RandomAccessFileReader(std::move(file), fname));
}
return s;
return io_s;
}

Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf,
bool for_compaction) const {
IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf,
bool for_compaction) const {
(void)aligned_buf;

TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
Status s;
IOStatus io_s;
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
Expand Down Expand Up @@ -86,22 +86,22 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == read_size);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
&tmp, buf.Destination(), nullptr);
io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
&tmp, buf.Destination(), nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
s);
io_s);
}

buf.Size(buf.CurrentSize() + tmp.size());
if (!s.ok() || tmp.size() < allowed) {
if (!io_s.ok() || tmp.size() < allowed) {
break;
}
}
size_t res_len = 0;
if (s.ok() && offset_advance < buf.CurrentSize()) {
if (io_s.ok() && offset_advance < buf.CurrentSize()) {
res_len = std::min(buf.CurrentSize() - offset_advance, n);
if (aligned_buf == nullptr) {
buf.Read(scratch, offset_advance, res_len);
Expand Down Expand Up @@ -146,14 +146,14 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == n);
s = file_->Read(offset + pos, allowed, opts, &tmp_result,
scratch + pos, nullptr);
io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
scratch + pos, nullptr);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
finish_ts, s);
finish_ts, io_s);
}
#endif

Expand All @@ -166,11 +166,11 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
assert(tmp_result.data() == res_scratch + pos);
}
pos += tmp_result.size();
if (!s.ok() || tmp_result.size() < allowed) {
if (!io_s.ok() || tmp_result.size() < allowed) {
break;
}
}
*result = Slice(res_scratch, s.ok() ? pos : 0);
*result = Slice(res_scratch, io_s.ok() ? pos : 0);
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
SetPerfLevel(prev_perf_level);
Expand All @@ -179,7 +179,7 @@ Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
file_read_hist_->Add(elapsed);
}

return s;
return io_s;
}

size_t End(const FSReadRequest& r) {
Expand Down Expand Up @@ -208,13 +208,13 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
return true;
}

Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
FSReadRequest* read_reqs,
size_t num_reqs,
AlignedBuf* aligned_buf) const {
IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
FSReadRequest* read_reqs,
size_t num_reqs,
AlignedBuf* aligned_buf) const {
(void)aligned_buf; // suppress warning of unused variable in LITE mode
assert(num_reqs > 0);
Status s;
IOStatus io_s;
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
Expand Down Expand Up @@ -280,7 +280,7 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts,

{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
}

#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -321,7 +321,7 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
file_read_hist_->Add(elapsed);
}

return s;
return io_s;
}

IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
Expand Down
20 changes: 10 additions & 10 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ class RandomAccessFileReader {
#endif
}

static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<RandomAccessFileReader>* reader,
IODebugContext* dbg);
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<RandomAccessFileReader>* reader,
IODebugContext* dbg);
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;

Expand All @@ -120,19 +120,19 @@ class RandomAccessFileReader {
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a
// region in aligned_buf.
Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const;
IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const;

// REQUIRES:
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs,
AlignedBuf* aligned_buf) const;
IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
size_t num_reqs, AlignedBuf* aligned_buf) const;

Status Prefetch(uint64_t offset, size_t n) const {
IOStatus Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, IOOptions(), nullptr);
}

Expand Down
81 changes: 41 additions & 40 deletions table/block_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace ROCKSDB_NAMESPACE {
inline void BlockFetcher::CheckBlockChecksum() {
// Check the crc of the type and the block contents
if (read_options_.verify_checksums) {
status_ = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
io_status_ = status_to_io_status(ROCKSDB_NAMESPACE::VerifyBlockChecksum(
footer_.checksum(), slice_.data(), block_size_, file_->file_name(),
handle_.offset());
handle_.offset()));
}
}

Expand Down Expand Up @@ -59,18 +59,18 @@ inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr) {
IOOptions opts;
Status s = file_->PrepareIOOptions(read_options_, opts);
if (s.ok() && prefetch_buffer_->TryReadFromCache(
opts, handle_.offset(), block_size_with_trailer_, &slice_,
&s, for_compaction_)) {
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (io_s.ok() && prefetch_buffer_->TryReadFromCache(
opts, handle_.offset(), block_size_with_trailer_,
&slice_, &io_s, for_compaction_)) {
CheckBlockChecksum();
if (!status_.ok()) {
if (!io_status_.ok()) {
return true;
}
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
} else if (!s.ok()) {
status_ = s;
} else if (!io_s.ok()) {
io_status_ = io_s;
return true;
}
}
Expand All @@ -82,18 +82,18 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &raw_data, block_size_with_trailer_);
if (status_.ok()) {
io_status_ = status_to_io_status(PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &raw_data, block_size_with_trailer_));
if (io_status_.ok()) {
heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
} else if (!status_.IsNotFound() && ioptions_.info_log) {
assert(!status_.ok());
} else if (!io_status_.IsNotFound() && ioptions_.info_log) {
assert(!io_status_.ok());
ROCKS_LOG_INFO(ioptions_.info_log,
"Error reading from persistent cache. %s",
status_.ToString().c_str());
io_status_.ToString().c_str());
}
}
return false;
Expand Down Expand Up @@ -136,7 +136,7 @@ inline void BlockFetcher::PrepareBufferForBlockFromFile() {
}

inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && read_options_.fill_cache &&
if (io_status_.ok() && read_options_.fill_cache &&
cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// insert to raw cache
Expand All @@ -146,8 +146,8 @@ inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
}

inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
cache_options_.persistent_cache &&
if (io_status_.ok() && !got_from_prefetch_buffer_ &&
read_options_.fill_cache && cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
// insert to uncompressed cache
PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
Expand Down Expand Up @@ -215,35 +215,36 @@ inline void BlockFetcher::GetBlockContents() {
#endif
}

Status BlockFetcher::ReadBlockContents() {
IOStatus BlockFetcher::ReadBlockContents() {
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif // NDEBUG
return Status::OK();
return IOStatus::OK();
}
if (TryGetFromPrefetchBuffer()) {
if (!status_.ok()) {
return status_;
if (!io_status_.ok()) {
return io_status_;
}
} else if (!TryGetCompressedBlockFromPersistentCache()) {
IOOptions opts;
status_ = file_->PrepareIOOptions(read_options_, opts);
io_status_ = file_->PrepareIOOptions(read_options_, opts);
// Actual file read
if (status_.ok()) {
if (io_status_.ok()) {
if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time);
status_ =
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, nullptr, &direct_io_buf_, for_compaction_);
PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data());
} else {
PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time);
status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, used_buf_, nullptr, for_compaction_);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, used_buf_, nullptr, for_compaction_);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) {
Expand Down Expand Up @@ -277,23 +278,23 @@ Status BlockFetcher::ReadBlockContents() {
}

PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
if (!status_.ok()) {
return status_;
if (!io_status_.ok()) {
return io_status_;
}

if (slice_.size() != block_size_with_trailer_) {
return Status::Corruption("truncated block read from " +
file_->file_name() + " offset " +
ToString(handle_.offset()) + ", expected " +
ToString(block_size_with_trailer_) +
" bytes, got " + ToString(slice_.size()));
return IOStatus::Corruption("truncated block read from " +
file_->file_name() + " offset " +
ToString(handle_.offset()) + ", expected " +
ToString(block_size_with_trailer_) +
" bytes, got " + ToString(slice_.size()));
}

CheckBlockChecksum();
if (status_.ok()) {
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
} else {
return status_;
return io_status_;
}
}

Expand All @@ -304,9 +305,9 @@ Status BlockFetcher::ReadBlockContents() {
// compressed page, uncompress, update cache
UncompressionContext context(compression_type_);
UncompressionInfo info(context, uncompression_dict_, compression_type_);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_,
memory_allocator_);
io_status_ = status_to_io_status(UncompressBlockContents(
info, slice_.data(), block_size_, contents_, footer_.version(),
ioptions_, memory_allocator_));
#ifndef NDEBUG
num_heap_buf_memcpy_++;
#endif
Expand All @@ -317,7 +318,7 @@ Status BlockFetcher::ReadBlockContents() {

InsertUncompressedBlockToPersistentCacheIfNeeded();

return status_;
return io_status_;
}

} // namespace ROCKSDB_NAMESPACE
4 changes: 2 additions & 2 deletions table/block_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BlockFetcher {
memory_allocator_compressed_(memory_allocator_compressed),
for_compaction_(for_compaction) {}

Status ReadBlockContents();
IOStatus ReadBlockContents();
CompressionType get_compression_type() const { return compression_type_; }

#ifndef NDEBUG
Expand Down Expand Up @@ -100,7 +100,7 @@ class BlockFetcher {
const PersistentCacheOptions& cache_options_;
MemoryAllocator* memory_allocator_;
MemoryAllocator* memory_allocator_compressed_;
Status status_;
IOStatus io_status_;
Slice slice_;
char* used_buf_ = nullptr;
AlignedBuf direct_io_buf_;
Expand Down

0 comments on commit 1700236

Please sign in to comment.